2024 年我们公司做了一个客服 AI 把 OpenAI 的 GPT-4 集成进客服后台 客户咨询 自动回复 一线客服只处理 AI 无法解决的复杂案例。从原型 demo 看一切完美 但真正上线后我们陆续踩了一堆坑。第一种最让我傻眼 第一天上线峰值 1000 QPS 我们直接被 OpenAI 限流 429 错误满天飞 一半客户咨询响应失败 后端日志里全是 RateLimitError。第二种最难缠 同一个 model 同一个 prompt 调用平均延迟 1.2 秒 P99 居然 8 秒 客户咨询响应慢用户掉头就走 SLA 完全做不到。第三种最离谱 月初看账单 OpenAI 一个月烧了 18 万美元 业务侧才赚 12 万 净亏 6 万 老板要砍项目。第四种最致命 我们某个 prompt 模板里漏了一个变量替换 直接把 user_id 当成 prompt 内容发给 OpenAI 数据泄露给上游 LLM 服务商 客户隐私合规事故。第五种最莫名其妙 同一个用户连续问 10 个相关问题 每次都重新发完整 system prompt + 历史 token 浪费严重 实际有效内容只占 token 的 20% 其他都是重复。我盯着这一连串问题想了很久才彻底想明白第一版错在一个根本的认知上我以为 LLM API 就是 HTTP 接口 想调就调 想多并发就多并发 价格按 token 算就 OK 可这个认知是错的真正能扛业务的 LLM API 集成是一个 速率限制与并发控制 加 多模型路由与降级 加 prompt cache 加 流式输出 加 异步队列 加 成本监控与告警 加 数据合规 的整套工程方法论 任何一环没做都可能在某次流量峰值或者月底账单里让你的产品停摆或者公司亏钱本文从头梳理 LLM API 集成的工程化要点 速率限制怎么做 prompt cache 怎么用 流式输出怎么集成 多模型怎么路由 成本怎么控 数据合规怎么保 以及一些把 LLM API 用扎实要避开的工程坑
问题背景:为什么调 OpenAI API 不是写个 requests
很多人对 LLM API 的认知是 拿个 SDK 调一下 就跟普通 HTTP API 一样 但生产里你会发现 限流 延迟 成本 合规 缓存命中率每一项都跟普通 API 不太一样。问题的根源在于:
- LLM API 是按 token 计费:不像传统 API 按调用次数 同样调用次数 token 用量差 10 倍价格差 10 倍 必须精算每次 token 消耗。
- 限流不只是 RPM:OpenAI 同时按 RPM 请求数和 TPM token 数限流 大 prompt 调用很快撞到 TPM 限制。
- 延迟有内禀下限:LLM 推理本身就慢 GPT-4o 平均 1-2 秒 GPT-4 turbo 平均 3-5 秒 想做实时业务必须用流式输出。
- 成本可以快速失控:一个 bug 让 prompt 多了 10K token 一天烧出 5 万美元 必须有实时成本监控。
- 多模型组合是常态:不同任务用不同模型 简单分类用 mini 复杂推理用 4o 必须有智能路由。
- 数据合规不能马虎:发给 OpenAI 的内容默认会被用来训练 必须开 zero retention 或用 Azure OpenAI 做合规层。
一 速率限制:RPM 与 TPM 双重约束
OpenAI 的限流是按账户 tier 分级 tier1 一般 500 RPM 30 万 TPM 高 tier 才能到几千 RPM。突破限流的核心策略是 客户端限流 加 重试 加 多 key 轮转 加 升级 tier。
import asyncio
import time
from collections import deque
from openai import AsyncOpenAI, RateLimitError
class TokenBucket:
def __init__(self, rpm: int, tpm: int):
self.rpm = rpm
self.tpm = tpm
self.request_log = deque() # (timestamp, tokens)
self.lock = asyncio.Lock()
async def acquire(self, estimated_tokens: int):
async with self.lock:
now = time.time()
cutoff = now - 60
while self.request_log and self.request_log[0][0] < cutoff:
self.request_log.popleft()
current_rpm = len(self.request_log)
current_tpm = sum(t for _, t in self.request_log)
if current_rpm + 1 > self.rpm or current_tpm + estimated_tokens > self.tpm:
wait = self.request_log[0][0] + 60 - now if self.request_log else 1
await asyncio.sleep(max(0.1, wait))
return await self.acquire(estimated_tokens)
self.request_log.append((now, estimated_tokens))
TokenBucket 同时维护 RPM 和 TPM 两个滑动窗口 acquire 时检查两个上限 任何一个不够就等待。下面是 LLM 调用层 把 bucket 串联进 chat completion 调用 同时配合指数退避重试 应对偶尔的 429 突刺。
class RateLimitedLLM:
def __init__(self, api_key: str, rpm: int = 450, tpm: int = 270000):
# 留 10% buffer 避免冲到红线
self.client = AsyncOpenAI(api_key=api_key)
self.bucket = TokenBucket(rpm, tpm)
async def chat(self, messages: list, model: str = 'gpt-4o-mini',
max_tokens: int = 1000):
estimated = sum(len(m['content']) // 3 for m in messages) + max_tokens
await self.bucket.acquire(estimated)
for attempt in range(5):
try:
resp = await self.client.chat.completions.create(
model=model, messages=messages, max_tokens=max_tokens,
)
return resp.choices[0].message.content
except RateLimitError as e:
wait = (2 ** attempt) + 0.1
await asyncio.sleep(wait)
raise RuntimeError('rate limit exceeded after retries')
客户端限流的关键是 留 10% buffer 因为 OpenAI 限流不是严格平滑窗口 偶尔会比设定值更严。我们生产里设 rpm 450 配合 500 RPM tier 实测稳定 偶尔的小突刺由重试兜底。多 key 轮转适合超过单 tier 限制的场景 把请求按 user_id hash 分到 3-5 个 key 等效把限额翻倍翻三倍。
二 Prompt Cache:重复部分自动缓存
OpenAI 和 Anthropic 都支持 prompt caching 同一个 system prompt 在 5 分钟内重复调用 cached 部分按 25-50% 价格计费 延迟也降 30-50%。但 cache 有 严格的顺序要求 必须 prefix 完全一致才命中 cache。
from openai import OpenAI
client = OpenAI()
# 推荐的 prompt 结构 cache 友好
# 1. system prompt 静态 长 这部分被 cache
# 2. 工具定义 静态 这部分也被 cache
# 3. 历史对话 半静态 增量 cache
# 4. 用户当前问题 动态 不 cache
SYSTEM_PROMPT_TEMPLATE = """你是 XX 公司的客服助手 专注于回答订单 退货 物流相关问题。
公司政策:
- 7 天无理由退货
- 30 天质保
- 包邮门槛 99 元
[... 后面还有几千 token 的政策与 FAQ ...]
回答要求:
- 简短直接
- 不确定的转人工
- 不要承诺超出政策的内容
"""
def chat_with_cache(user_question: str, history: list = None):
messages = [
{
'role': 'system',
'content': SYSTEM_PROMPT_TEMPLATE, # 这部分被 cache
},
]
if history:
messages.extend(history)
messages.append({'role': 'user', 'content': user_question})
resp = client.chat.completions.create(
model='gpt-4o',
messages=messages,
)
usage = resp.usage
# OpenAI 返回的 usage 里有 prompt_tokens_details.cached_tokens
cached = getattr(usage.prompt_tokens_details, 'cached_tokens', 0) if hasattr(usage, 'prompt_tokens_details') else 0
print(f'total={usage.prompt_tokens} cached={cached} hit_rate={cached/max(usage.prompt_tokens,1):.1%}')
return resp.choices[0].message.content
# Anthropic 的做法稍微不同 显式标记 cache_control
def claude_with_cache(user_question: str):
import anthropic
client = anthropic.Anthropic()
resp = client.messages.create(
model='claude-3-5-sonnet-20241022',
max_tokens=1024,
system=[
{
'type': 'text',
'text': SYSTEM_PROMPT_TEMPLATE,
'cache_control': {'type': 'ephemeral'}, # 显式 cache
},
],
messages=[{'role': 'user', 'content': user_question}],
)
return resp.content[0].text
prompt cache 的工程价值不只是省钱 更是延迟 cached 部分服务端不用重新 forward 一次 减少 50% TTFT time-to-first-token 用户体验大幅改善。我们的客服 AI 上 prompt cache 后 月成本从 18 万降到 6 万 P99 延迟从 8 秒降到 3 秒 一个动作两个收益。
三 流式输出:体验与延迟的平衡
LLM 推理本身慢 但用户能接受 因为 ChatGPT 的体验告诉大家 一边生成一边显示 不会等到全部生成完才看到 这就是流式输出 stream。后端用 SSE 或 WebSocket 把 token 一边生成一边推给前端。
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
from openai import AsyncOpenAI
import json
app = FastAPI()
client = AsyncOpenAI()
@app.post('/chat')
async def chat_stream(req: Request):
body = await req.json()
messages = body['messages']
async def event_stream():
try:
stream = await client.chat.completions.create(
model='gpt-4o',
messages=messages,
stream=True,
stream_options={'include_usage': True},
)
async for chunk in stream:
if chunk.choices and chunk.choices[0].delta.content:
data = {'delta': chunk.choices[0].delta.content}
yield f'data: {json.dumps(data, ensure_ascii=False)}\n\n'
if chunk.usage:
data = {'usage': {
'prompt': chunk.usage.prompt_tokens,
'completion': chunk.usage.completion_tokens,
}}
yield f'data: {json.dumps(data)}\n\n'
yield 'data: [DONE]\n\n'
except Exception as e:
err = json.dumps({'error': str(e)})
yield f'data: {err}\n\n'
return StreamingResponse(event_stream(), media_type='text/event-stream')
客户端 JavaScript 接收流式输出:
async function streamChat(messages) {
const resp = await fetch('/chat', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ messages }),
});
const reader = resp.body.getReader();
const decoder = new TextDecoder();
let buffer = '';
while (true) {
const { done, value } = await reader.read();
if (done) break;
buffer += decoder.decode(value, { stream: true });
const lines = buffer.split('\n\n');
buffer = lines.pop();
for (const line of lines) {
if (!line.startsWith('data: ')) continue;
const payload = line.slice(6);
if (payload === '[DONE]') return;
try {
const data = JSON.parse(payload);
if (data.delta) appendToUI(data.delta);
if (data.usage) recordCost(data.usage);
if (data.error) showError(data.error);
} catch (e) {
console.error('parse error', e);
}
}
}
}
流式输出的工程坑是 中间断流不好处理 因为客户端可能因为网络波动 浏览器切后台等各种原因断开 后端没法知道用户还在不在 必须有 idle timeout 加客户端重连机制。另外流式输出的 TTFT 通常 500ms 左右 比非流式整个等完 5-8 秒体验提升巨大 任何用户面对面的场景都应该用流式。
四 多模型路由:成本与质量的最优解
不是所有任务都需要 GPT-4 简单任务用 mini 复杂任务用 4o 反思任务用 o1 这是成本最优的策略。一个智能路由层根据任务复杂度 用户等级 历史性能动态选模型 比单一模型省 60-80% 成本。
from openai import AsyncOpenAI
class ModelRouter:
MODEL_PROFILES = {
'gpt-4o-mini': {'input_cost': 0.00015, 'output_cost': 0.0006, 'tier': 'fast'},
'gpt-4o': {'input_cost': 0.0025, 'output_cost': 0.01, 'tier': 'balanced'},
'o1-mini': {'input_cost': 0.003, 'output_cost': 0.012, 'tier': 'reasoning'},
'o1': {'input_cost': 0.015, 'output_cost': 0.06, 'tier': 'reasoning-pro'},
}
def __init__(self):
self.client = AsyncOpenAI()
def classify_task(self, messages: list) -> str:
text = messages[-1]['content'].lower()
# 简单 keyword 路由 实战可以用一个 mini 模型分类
if any(w in text for w in ['物流', '快递', '订单状态', '客服电话']):
return 'simple'
if any(w in text for w in ['退货政策', '质保', '比较', '推荐']):
return 'medium'
if any(w in text for w in ['算', '为什么', '推理', '分析']):
return 'complex'
return 'medium'
async def chat(self, messages: list, force_model: str = None) -> dict:
task_type = self.classify_task(messages)
model = force_model or {
'simple': 'gpt-4o-mini',
'medium': 'gpt-4o',
'complex': 'o1-mini',
}[task_type]
resp = await self.client.chat.completions.create(
model=model, messages=messages, max_tokens=1000,
)
profile = self.MODEL_PROFILES[model]
cost = (resp.usage.prompt_tokens * profile['input_cost']
+ resp.usage.completion_tokens * profile['output_cost']) / 1000
return {
'content': resp.choices[0].message.content,
'model': model,
'task_type': task_type,
'cost_usd': round(cost, 5),
}
[mermaid]flowchart TD
A[用户问题进入] --> B[速率限制 acquire]
B --> C[任务分类 classifier]
C -->|简单| D[gpt-4o-mini]
C -->|中等| E[gpt-4o]
C -->|复杂推理| F[o1-mini]
D --> G[prompt cache 命中]
E --> G
F --> G
G --> H[流式输出]
H --> I[实时 token 计费]
I --> J[告警阈值检查]
J --> K[返回客户端]
路由层的关键是 监控每个模型的实际表现 用户满意度 错误率 平均延迟 这些数据反过来调整路由策略 不要写死规则。我们的路由层 每周自动跑一次回归 用人工标注的 500 条测试集 测每个模型的命中率 看是不是要调整规则。
五 数据合规:不要把客户数据喂给 LLM
把客户数据发给 OpenAI 这事 合规风险很大 默认 OpenAI 会保留数据 30 天用于滥用监控 部分历史版本还可能用于训练。生产里必须做数据脱敏 + zero retention 协议 + 区域合规。
import re
class PIIScrubber:
PATTERNS = {
'phone': re.compile(r'1[3-9]\d{9}'),
'email': re.compile(r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}'),
'id_card': re.compile(r'\d{15}|\d{17}[\dXx]'),
'bank_card': re.compile(r'\d{16,19}'),
'address': re.compile(r'[一-鿿]{2,4}(?:省|市|区|县)[一-鿿0-9]{2,20}(?:街|路|号)'),
}
@classmethod
def scrub(cls, text: str) -> tuple:
replacements = {}
out = text
for tag, pat in cls.PATTERNS.items():
for i, m in enumerate(pat.finditer(text)):
placeholder = f'[{tag.upper()}_{i}]'
replacements[placeholder] = m.group()
out = out.replace(m.group(), placeholder, 1)
return out, replacements
@classmethod
def restore(cls, text: str, replacements: dict) -> str:
for placeholder, original in replacements.items():
text = text.replace(placeholder, original)
return text
有了 PIIScrubber 在调用 LLM 前先 scrub 把原始敏感串换成占位符 LLM 看到的是 [PHONE_0] [EMAIL_0] 这种 LLM 输出后再用 restore 把占位符还原成原始内容 用户看到的是正常的回复 但 LLM 服务商那边永远不会看到真实敏感数据。
# 调 LLM 前脱敏 LLM 输出后还原
def chat_with_pii_protection(user_question: str):
scrubbed, mapping = PIIScrubber.scrub(user_question)
answer = call_llm(scrubbed)
return PIIScrubber.restore(answer, mapping)
合规层面除了脱敏 还要确认 LLM 服务商的数据政策 OpenAI 企业版 zero retention Azure OpenAI 区域部署 都是合规友好的选择 国内业务必须选符合数据出境规定的方案。我们公司金融业务直接用 Azure OpenAI 部署在新加坡 region 客户数据不出境 同时签 BAA business associate agreement 合规审计能过。
六 LLM API 集成的工程坑:那些文档里学不到的
讲完原理来说几个真实生产里踩过的坑。第一个坑是 token 估算不准 用 len(text)//3 是粗略估算 中文混英文混代码差很多 必须用 tiktoken 精确计数 否则容易超 max_tokens 调用失败。第二个坑是 stream 错误处理 流式调用中途出错 比如 content policy 违规 你已经吐出去半句话给用户了 必须有断流标记和回滚机制。第三个坑是 模型升级静默漂移 你用 gpt-4o alias 不锁版本 OpenAI 发新版本时性能可能变 必须用 gpt-4o-2024-08-06 这种 snapshot 版本。第四个坑是 timeout 设置 默认 httpx timeout 5 分钟 极端长 prompt 可能超过 必须显式设 timeout 加客户端 cancel 机制。第五个坑是 多账户管理 你为了高并发用 5 个 OpenAI key 但用量监控 错误率监控 成本核算都要分账户 否则一个 key 出问题影响整个服务 必须有完整的 multi-tenant 管理。
关键概念速查
| 概念 | 含义 | 工程价值 |
|---|---|---|
| RPM TPM | 请求与 token 双限 | 客户端必须双重控制 |
| Token Bucket | 客户端限流 | 避免冲红线 |
| Prompt Cache | 系统 prompt 缓存 | 省 50-75% 成本 |
| 流式输出 | token 边生成边推 | TTFT 降到 500ms |
| 多模型路由 | 按任务选模型 | 省 60-80% 成本 |
| PII Scrubber | 敏感信息脱敏 | 合规必备 |
| Zero Retention | 数据不留存 | 企业版协议 |
| tiktoken | 精确 token 计数 | 替代字符估算 |
| snapshot 模型 | 锁版本 | 避免静默漂移 |
| 断流处理 | stream 异常 | UX 兜底 |
避坑清单
- 客户端必须做 RPM TPM 双重限流 留 10% buffer 不要把限额吃满。
- 多 key 轮转扩 RPM 高峰必备 按 user_id hash 分流 5 个 key 等效 5 倍限额。
- 静态 system prompt 必须开 prompt cache 5 分钟内重复调用降 50% 成本。
- 用户面对面场景必须用 stream TTFT 从 5 秒降到 500ms 体验天差地别。
- 多模型路由按任务复杂度分配 简单 mini 复杂 4o 推理 o1 省 60% 成本。
- 客户敏感数据必须脱敏后再调 LLM PII Scrubber 是合规底线。
- 企业版协议必须 zero retention 30 天保留也是合规风险。
- token 计数用 tiktoken 不要用字符估算 中英混合差异大。
- 模型必须锁 snapshot gpt-4o-2024-08-06 不要用 alias 静默漂移。
- 成本监控实时跑 单日超阈值立即报警 一个 bug 能一夜烧出 5 万美元。
总结
LLM API 集成这事 很多人的直觉是 调个 HTTP 接口而已 跟以前调微信 API 调 Stripe 一样简单 可这其实是把 我会调 chat.completions 和 我能在生产用 LLM API 扛住高并发低延迟低成本合规 混为一谈。前者是会用 SDK 后者是懂 LLM 集成工程。中间隔着的是 速率限制 prompt cache 流式输出 多模型路由 成本控制 数据合规 整整一套工程方法论。
从原型到生产 你需要做的事远不止 调一个 API。你要懂 RPM TPM 双重限流 要会做 prompt cache 要集成流式输出 要设计多模型路由 要做数据脱敏 要锁定模型版本 要监控实时成本 要处理流式错误。每一项单独看都不复杂 但它们组合在一起 才是一个能在生产扛得住的 LLM 集成层。少任何一项 都可能在某次流量峰值或者月底账单里 让你的产品停摆或者公司亏钱。
我经常用一个比喻来理解 LLM API 集成 它有点像跟一家超牛但很贵的外包团队合作。LLM 是外包团队 API 是合同接口 RPM TPM 是合同里的工作量上限 prompt cache 是给团队的标准化背景资料避免每次重新介绍 流式是分阶段交付不要等整个项目完工 多模型路由是把不同难度的活分给不同级别的人 数据脱敏是不能把公司机密丢给外包看 zero retention 是合同里的保密条款。你不能因为外包牛就什么都丢过去 还要管工作量上限 标准资料 分阶段交付 任务分发 保密管理 这才是一整套外包合作工程。
这套架构最难的地方在于 它的复杂度在小流量 demo 时几乎完全暴露不了。你写个 demo 调 OpenAI 一切顺利 觉得 LLM 集成挺简单。但真正放到生产 1000 QPS 高峰 各种长 prompt 各种特殊用户 各种边角合规需求 你才发现 99% 的复杂度都在 那 1% 的极端 case 里 限流 prompt cache miss 流式断流 成本失控 数据泄露 模型漂移。建议任何想做 LLM API 集成的团队 上线前一定要做 高并发演练 故意打到限额 故意触发 timeout 故意造大 prompt 看系统表现如何 千万别等真实流量来教你 那时候账单可能已经烧到老板要砍项目的程度了。
—— 别看了 · 2026