FastAPI 单实例 QPS 上不去:asyncio 隐性阻塞的 5 个真实坑

FastAPI 压测 QPS 320 / p99 2 秒,看代码"全用了 async",py-spy 抓出一处 requests.get 阻了 event loop。本文讲透 asyncio 单线程协作模型 + 5 种隐性阻塞 + 火焰图诊断 + uvloop + asyncpg + 进程池 + 4 倍 QPS 优化路径,附完整代码。

一个 FastAPI 服务接了三方 OCR 接口,压测发现单实例 QPS 上不去,p99 居高不下。看代码"全部用了 async",看 CPU 也没满。最后用 py-spy 抓到罪魁:有一处偷偷的 requests.get 阻塞了 event loop,所有协程都被卡在那。本文把 asyncio 里常见的 5 种隐性阻塞讲清楚,带 py-spy 实战。

事故现场

@app.post("/ocr")
async def ocr_endpoint(file: UploadFile):
    img_bytes = await file.read()

    # 步骤 1: 调三方 OCR
    result = await ocr_client.recognize(img_bytes)        # ✅ 用了 await

    # 步骤 2: 把图压缩存 OSS
    compressed = compress_image(img_bytes)                 # ❌ CPU 密集,阻塞 event loop

    # 步骤 3: 写日志到内部日志服务
    log_to_internal(result)                                # ❌ 用的 requests,阻塞

    return {"text": result.text}

看起来"加了 async 就够",实际上压缩图片和日志服务调用都阻塞了 event loop。一个请求阻 200ms,event loop 这 200ms 啥也干不了,所有其他请求被排队。

asyncio 的核心模型

Python 的 asyncio 是单线程协作式调度:

  [event loop] - [coroutine A] - await io
                 [coroutine B] - await io
                 [coroutine C] - await io
                 ...

event loop 调度所有协程,await 时把控制权还回 loop,loop 选下一个就绪的协程跑。

❗ 一旦协程不 await(纯 CPU 或同步 IO),就独占 event loop
   其他协程必须等它结束才能跑。整个服务的并发度 → 1。

隐性阻塞 1:用了同步 IO 库

# 错:requests 是同步的
import requests
async def fetch_data():
    r = requests.get("https://api.example.com/data")    # 阻塞 200ms
    return r.json()

# 对:用 httpx 或 aiohttp
import httpx
async def fetch_data():
    async with httpx.AsyncClient() as client:
        r = await client.get("https://api.example.com/data")
        return r.json()

常见误用 + 对应正确库:

同步库         异步替代
requests       httpx / aiohttp
psycopg2       asyncpg / aiopg
pymongo        motor
redis-py(同步)  aioredis / redis.asyncio(redis-py 4.2+ 内置)
elasticsearch  elasticsearch-async / async client
boto3          aioboto3
pymysql        aiomysql
kafka-python   aiokafka

隐性阻塞 2:CPU 密集计算

# 错:在协程里直接做 CPU 密集计算
async def process_image(img_bytes):
    img = Image.open(io.BytesIO(img_bytes))
    img.thumbnail((800, 800))               # PIL 缩放,纯 CPU
    output = io.BytesIO()
    img.save(output, format='JPEG', quality=85)
    return output.getvalue()                 # 整个过程阻塞 event loop

# 对:扔到线程池
import asyncio
import concurrent.futures

_thread_pool = concurrent.futures.ThreadPoolExecutor(max_workers=8)

async def process_image(img_bytes):
    loop = asyncio.get_event_loop()
    return await loop.run_in_executor(_thread_pool, _process_sync, img_bytes)

def _process_sync(img_bytes):
    img = Image.open(io.BytesIO(img_bytes))
    img.thumbnail((800, 800))
    output = io.BytesIO()
    img.save(output, format='JPEG', quality=85)
    return output.getvalue()

更进一步:CPU 密集 + GIL,线程池其实并发不上去(Python GIL 限制单核)。重 CPU 用进程池:

_process_pool = concurrent.futures.ProcessPoolExecutor(max_workers=4)

async def process_image_heavy(img_bytes):
    loop = asyncio.get_event_loop()
    return await loop.run_in_executor(_process_pool, _process_sync, img_bytes)
    # 进程池避免 GIL,真并行

隐性阻塞 3:asyncio.sleep 写成 time.sleep

# 错
async def retry_with_delay():
    for i in range(3):
        try:
            return await call_api()
        except RetryableError:
            time.sleep(1)       # ❌ 阻塞整个 event loop 1 秒
    raise Exception("max retry exceeded")

# 对
async def retry_with_delay():
    for i in range(3):
        try:
            return await call_api()
        except RetryableError:
            await asyncio.sleep(1)     # 让出 event loop
    raise Exception("max retry exceeded")

隐性阻塞 4:同步生成器 + 大数据处理

# 错
async def process_large_file(path):
    with open(path, 'rb') as f:
        data = f.read()                # ❌ 同步读 1GB 文件,阻塞 5 秒
        for line in data.split(b'\n'):
            process(line)
    return "done"

# 对:用 aiofiles + 分块
import aiofiles
async def process_large_file(path):
    async with aiofiles.open(path, 'rb') as f:
        async for line in f:           # 异步迭代
            process(line)
    return "done"

隐性阻塞 5:不知道某些库底层是同步的

# 看起来异步,实际底层用 thread pool 模拟,IO 真发生时仍是同步
async def hash_file(path):
    return await asyncio.to_thread(_hash_sync, path)

# fastapi 的 BackgroundTasks 在主进程里跑,不是真异步
@app.post("/upload")
async def upload(bg: BackgroundTasks):
    bg.add_task(send_notification)  # 不会阻塞响应,但在 event loop 里跑,长任务还是会阻
    return {"ok": True}

# 真的后台任务要用 Celery / Arq / RQ
from arq import create_pool
async def upload_endpoint():
    redis = await create_pool(...)
    await redis.enqueue_job('send_notification', user_id)
    return {"ok": True}

诊断工具:py-spy

# 安装
pip install py-spy

# 抓堆栈采样(60 秒,在 prod 上可以跑,几乎无开销)
py-spy record -o profile.svg --pid 12345 --duration 60

# 火焰图打开看,直接定位哪个函数占了 event loop 大部分时间
# 关键指标:event loop 线程在 read() / send() / time.sleep / requests / json.loads 大数据 等同步调用上的时间

# 也可以 top 模式实时看
py-spy top --pid 12345
#  %CPU  Function                          Module
#  45.2   read                              ssl
#  20.1   loads                             json
#  10.8   compress_image                    app.image_utils
#  ...

# subprocess 模式:不用 attach
py-spy record -o profile.svg -- python -m uvicorn app:app

uvloop + orjson 性能优化

# uvloop:用 Cython 重写的 event loop,比默认快 2-4 倍
import uvloop
import asyncio
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())

# FastAPI 启动加 --loop uvloop
# uvicorn app:app --loop uvloop --workers 4

# orjson:比标准 json 快 5-10 倍,用 Rust 写
import orjson
from fastapi.responses import ORJSONResponse

@app.get("/users", response_class=ORJSONResponse)
async def get_users():
    return {"data": [...]}

# 替换默认 response_class
app = FastAPI(default_response_class=ORJSONResponse)

asyncpg 连接池配置

import asyncpg
from contextlib import asynccontextmanager

pool: asyncpg.Pool

@asynccontextmanager
async def lifespan(app: FastAPI):
    global pool
    pool = await asyncpg.create_pool(
        dsn="postgresql://user:pass@db/mydb",
        min_size=10,             # 启动就有 10 个连接
        max_size=50,             # 高峰最多 50
        max_queries=50_000,      # 单连接处理 5w 个 query 后回收
        max_inactive_connection_lifetime=300,  # 空闲 5 分钟回收
        command_timeout=30,      # 单 query 超时 30 秒
    )
    yield
    await pool.close()

app = FastAPI(lifespan=lifespan)

@app.get("/user/{uid}")
async def get_user(uid: int):
    async with pool.acquire() as conn:
        row = await conn.fetchrow("SELECT * FROM users WHERE id = $1", uid)
        return dict(row) if row else {"error": "not_found"}

压测对比

FastAPI + 同样的业务逻辑,wrk -t 4 -c 200 -d 30s

配置                                  QPS       p99 延迟
=================================================
- requests 同步 + sync sleep + PIL         320     2100ms
+ 换 httpx                                950      800ms
+ 用 asyncpg 而不是 psycopg2              1400      550ms
+ PIL 扔线程池                            2100      280ms
+ uvloop                                  3400      150ms
+ orjson                                  3800      130ms
+ 4 worker(gunicorn)                     14500       90ms

总提升:45 倍 QPS,延迟降到 4%

结构化排查路径

  1. 装 py-spy 抓火焰图,确认是不是有同步函数在 event loop 上
  2. 检查所有 IO 调用(DB / Redis / HTTP / Kafka)是不是用的异步库
  3. 检查所有 CPU 密集函数(图片处理 / 加密 / 压缩 / JSON 大对象)有没有 run_in_executor
  4. time.sleep 全局替换为 asyncio.sleep
  5. 开 uvloop + orjson
  6. 合理的 worker 数(一般 = 2 * CPU 数 + 1)
  7. 数据库连接池 size 配合 worker 数

团队最后做的代码规范

# 1. 禁止在 async def 里调用同步 IO 库
# CI 检查:grep -E "async def" -A 50 *.py | grep -E "requests\.(get|post)|psycopg2|time\.sleep"
# 命中即拒绝合并

# 2. 重 CPU 操作必须扔 executor,用装饰器统一
def run_in_thread(func):
    @functools.wraps(func)
    async def wrapper(*args, **kwargs):
        loop = asyncio.get_event_loop()
        return await loop.run_in_executor(None, lambda: func(*args, **kwargs))
    return wrapper

@run_in_thread
def heavy_compute(data):
    # 这里可以放任何 CPU 操作
    return process(data)

# 3. 每个新接口必须跑 py-spy 60 秒看火焰图,event loop 占用不能超 5%

asyncio 是 Python 服务高并发的命脉,但隐性阻塞太容易写错了。一处同步 IO 就能让全体协程一起趴下。装好 py-spy,养成"每个新接口看一眼火焰图"的习惯,事故率能降一个数量级。我们这套规范跑了一年半,服务的 p99 从最早的 2 秒降到 100ms 以内,worker 数还省了一半。

—— 别看了 · 2026
声明:本站所有文章,如无特殊说明或标注,均为本站原创发布。任何个人或组织,在未征得本站同意时,禁止复制、盗用、采集、发布本站内容到任何网站、书籍等各类媒体平台。如若本站内容侵犯了原著者的合法权益,可联系我们进行处理 邮箱1846861578@qq.com。
技术教程

JWT 双 Token 续期 + 多设备登录 + 强制撤销:生产级 Spring Boot 实现

2026-5-19 11:24:09

技术教程

Kafka 顺序消费的 7 层防线:订单状态错乱事故复盘

2026-5-19 11:28:17

0 条回复 A文章作者 M管理员
    暂无讨论,说说你的看法吧
个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索