2024 年我做一个 AI 对话产品。核心功能很简单:用户输入一个问题,后端调大模型,把答案返回给前端显示。第一版我做得很直接:前端发请求,后端调模型的 chat.completions.create,等模型把答案完整生成完,把这一整段答案一次性返回给前端,前端拿到后整段渲染出来。本地一测——能用,答案也对。可真正给同事试用,问题立刻就来了:同事点了"发送",然后对着一片空白的屏幕,干等。等了 3 秒,屏幕还是空的;等到 8 秒,他以为卡死了,伸手就想刷新页面;直到第 十几秒,答案才"啪"地一下整段蹦出来。他问我:"你这个是不是卡了?"——可它没卡,它只是在老老实实地生成。我盯着这个"等十几秒、然后整段蹦出"的体验想了很久才想明白,第一版错在一个根本的认知上:我以为"等模型生成完,一次性返回,就行了"。这个想法,在逻辑上没错,但它忽略了一件事:大模型生成一段长答案,是要花时间的——它是一个字一个字(更准确说,一个 token 一个 token)往外蹦的,蹦完整段可能要十几秒。而我的第一版,是攒着:模型蹦出来的字,我一个都不给用户看,非要等它全部蹦完,才一次性端上去。在这十几秒里,用户那边什么反馈都没有,他无从判断系统到底是在干活还是已经死了。可你想想:模型在第 1 秒,其实就已经生成出开头的几个字了——这几个字,本可以立刻显示给用户看。正确的做法,是流式输出:模型蹦一个字,我就推一个字给前端,前端蹦一个字,就显示一个字。用户在第 1 秒就能看到答案开始一个字一个字地往外冒——这正是你在所有主流 AI 产品里看到的那个"打字机"效果。它不会让总时间变短,但它让用户从第一刻起就确信"系统在为我工作"。我以为流式不过是"加个参数",结果真做下来,坑一个接一个。这篇文章就把它梳理一遍:为什么"等生成完再返回"体验这么差、流式输出的本质是什么、模型的流式 API 怎么用、怎么用 SSE 把流推给前端、前端怎么接收,以及流的中断、出错、首字延迟这些把流式真正做对要避开的坑。
问题背景
先把那次的现象和我的误判讲清楚,后面所有的设计都是冲着纠正这个误判去的。
现象:一个 AI 对话产品,后端等大模型把答案完整生成完,再一次性返回前端。用户点"发送"后,盯着空白屏幕干等十几秒,中途完全没有任何反馈,常常以为卡死了想刷新页面,直到最后答案整段蹦出。
我当时的错误认知:"等模型生成完,把完整答案一次性返回,这是最简单也最正确的做法。"
真相:大模型是一个 token 一个 token 逐步生成的,生成一整段长答案要十几秒。"等生成完再返回"等于把已经生成好的开头也一起扣住,白白让用户对着空屏幕焦虑。正确的做法是流式输出:模型每生成一小段,就立刻推送给前端显示。技术上,模型 API 提供 stream=True 逐块返回;后端用 SSE(Server-Sent Events) 把这些块持续推给前端;前端边收边渲染,形成"打字机"效果。
要把流式输出做对,需要几块认知:
- 为什么"等生成完再返回"体验差——它扣住了已生成的开头;
- 流式输出的本质——边生成边推送,让反馈尽早到达;
- 模型的流式 API 怎么用,返回的 chunk 长什么样;
- 怎么用 SSE 把流从后端推到前端,前端怎么接收;
- 流的中断、中途出错、首字延迟这些工程坑怎么处理。
一、为什么"等生成完再返回"体验这么差
先把这件最根本的事钉死:大模型生成长文本是逐 token、耗时十几秒的过程;"等全部生成完再返回",等于把早已生成好的开头也一起扣留,让用户在最该获得反馈的前十几秒里一无所知。
下面这段代码,就是我那个"用户对着空屏幕干等"的第一版——它是阻塞式的:
from openai import OpenAI
client = OpenAI()
def chat_blocking(question: str) -> str:
# 反面教材:阻塞式调用,等模型把答案【全部生成完】才返回。
resp = client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": question}],
)
# 这一行会一直【卡住】,直到模型生成完整段答案 ——
# 可能是十几秒。这十几秒里,调用方什么都拿不到。
return resp.choices[0].message.content
# 问题:模型在第 1 秒其实已生成出开头几个字,
# 但这里非要等到第 15 秒整段生成完,才一次性交出去。
# 用户那十几秒,对着空屏幕,无从判断系统是死是活。
这段代码没有任何语法错误,逻辑也完全正确——它确实能拿到完整答案。它的问题不在对错,而在一个错误的时间观:它把"生成答案"当成了一个瞬间完成的动作,于是"等它完成再返回"听起来天经地义。可"生成答案"不是一个瞬间动作,它是一个持续十几秒的过程。在这个过程里,答案是一点一点变长的:第 1 秒可能有 10 个字,第 5 秒有 100 个字,第 15 秒才完整。chat_blocking 的做法,相当于守着一个正在出菜的厨房,前面的菜一道道都好了,却偏要等最后一道也上齐,才把整桌菜一起端给客人——客人在空桌子前坐了很久,而其实第一道菜早就能吃了。问题的根子清楚了:你不该攒着,你该好一道、上一道。
二、流式输出的本质:边生成边推送
上一节的死结是:阻塞式调用扣住了那些早已生成好的内容。流式输出(Streaming)的破局点就一句话:不要等,模型生成出多少,就立刻往前端推多少。
它的运作方式,和阻塞式形成鲜明对比。阻塞式是"一锤子买卖":请求-等待-一次性响应,响应只发生一次。而流式是"细水长流":一次请求,后端会持续不断地往回送许多个小数据块,直到答案完整。用户看到的,就是答案一个字一个字往外冒的"打字机"效果。
这里要想清楚一件事:流式并不会让答案生成得更快——模型生成完整段答案,该花十几秒还是十几秒,流式一秒都省不了。它改变的是另一个东西,一个对体验极其关键的指标:首字延迟(从用户发出请求,到他看到第一个字的时间)。阻塞式的首字延迟,等于整段生成的总时间(十几秒)——因为用户要等到最后才一次看到全部。流式的首字延迟,只等于模型生成出头几个字的时间(可能不到 1 秒)。用户的焦虑,几乎全部来自这个"看到第一个字之前"的空白期。流式做的,就是把这个空白期从十几秒压缩到几乎为零。它没有缩短总时长,但它极大地缩短了"感觉上的等待"。理解了这个,剩下的就是工程问题——第一步是:怎么从模型那里,拿到这个"一点一点的流"?
三、模型的流式 API:逐块拿到生成结果
好消息是,主流大模型的 API 原生就支持流式。你不需要自己做什么特殊处理,只要在调用时多加一个参数:stream=True。加上它之后,API 的返回值就变了——它不再是一个完整的响应对象,而变成一个可以迭代的流:你用 for 循环去迭代它,每一轮拿到一个小数据块(chunk),块里装着模型刚刚新生成的那一小段文字。
def stream_from_model(question: str):
"""开启 stream=True,逐块拿到模型生成的内容。"""
stream = client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": question}],
stream=True, # 关键:开启流式
)
for chunk in stream: # 迭代这个流,一块一块地拿
# 每个 chunk 里,delta.content 是这一块【新增】的文字。
# 注意:它可能是 None(比如流的第一块或最后一块),要判一下。
delta = chunk.choices[0].delta.content
if delta:
yield delta # 把这一小段文字交出去
这里有个关键细节:每个 chunk 里的 delta.content,是这一块新增的文字,不是到目前为止的全部文字。也就是说,你把每一块的 delta 依次拼接起来,才得到完整答案。还有,delta.content 可能是 None(比如流的第一块往往只带角色信息、最后一块只带结束标记),所以迭代时必须判空,否则会报错。现在,后端已经能一块一块地拿到生成结果了。下一个问题是:怎么把这个"后端的流",同样流式地送到前端?
四、用 SSE 把流推送到前端
后端有了流,要把它持续地送到前端,需要一种支持"服务端持续往客户端推数据"的传输方式。最合适的就是 SSE(Server-Sent Events,服务器发送事件)。SSE 是 HTTP 的一种用法:客户端发起一个普通请求,但这个连接不立刻关闭,服务端可以源源不断地往这条连接里写数据,客户端边收边处理。它天生就是为"服务端推流"设计的,而且比 WebSocket 简单得多。
SSE 的数据有一个固定格式:每一条消息,以 data: 开头,以两个换行符 \n\n 结尾。下面用 Flask 写一个 SSE 接口:
import json
from flask import Flask, request, Response
app = Flask(__name__)
@app.route("/chat/stream")
def chat_stream():
question = request.args.get("q", "")
def event_generator():
# 把模型流里的每一小段,按 SSE 格式("data: ...\n\n")往外发
for delta in stream_from_model(question):
payload = json.dumps({"content": delta}, ensure_ascii=False)
yield f"data: {payload}\n\n" # SSE 单条消息的格式
# 全部发完,再发一条特殊消息,明确告诉前端"结束了"
yield 'data: {"done": true}\n\n'
# mimetype 必须是 text/event-stream,前端才会按 SSE 来解析
return Response(event_generator(), mimetype="text/event-stream")
这段代码里有两个不能漏的点。第一,Response 的 mimetype 必须是 text/event-stream——这是 SSE 的标志,前端和浏览器靠它识别这是一个流。第二,在所有内容发完后,要额外发一条带 done 标记的消息。因为流式响应没有传统响应那种明确的"响应体到此结束",前端无从知道"是真的发完了,还是网络卡了一下"。所以你得自己约定一个"结束信号",显式地告诉前端"到此为止"。后端的流推出去了,接力棒交到前端手里。
五、前端怎么接收并渲染这个流
前端接收 SSE 流,有两种常见方式。第一种是浏览器原生的 EventSource 对象,它专门用来消费 SSE,用起来最简单:
function chatWithEventSource(question) {
// EventSource 是浏览器原生的 SSE 客户端
const es = new EventSource(`/chat/stream?q=${encodeURIComponent(question)}`);
let answer = "";
es.onmessage = (event) => {
const data = JSON.parse(event.data);
if (data.done) { // 收到约定的结束信号
es.close(); // 关键:必须手动关闭,否则会一直重连
return;
}
answer += data.content; // 把这一小段拼接到已有答案上
document.getElementById("answer").textContent = answer; // 实时渲染
};
es.onerror = () => {
es.close(); // 出错也要关闭,避免无限重连
};
}
EventSource 简单,但有局限:它只支持 GET 请求,也不能自定义请求头(比如带认证 token 就不方便)。如果需要 POST、需要带请求头,就得用第二种方式——fetch 配合 ReadableStream 手动读取流:
async function chatWithFetch(question) {
const resp = await fetch("/chat/stream", {
method: "POST",
headers: { "Content-Type": "application/json" },
body: JSON.stringify({ q: question }),
});
const reader = resp.body.getReader(); // 拿到流的读取器
const decoder = new TextDecoder();
let answer = "";
while (true) {
const { value, done } = await reader.read(); // 一块一块地读
if (done) break; // 流读完了
const text = decoder.decode(value); // 字节解码成文字
// SSE 每条消息以 \n\n 分隔,逐条解析出来
for (const line of text.split("\n\n")) {
if (line.startsWith("data: ")) {
const data = JSON.parse(line.slice(6));
if (!data.done) {
answer += data.content;
document.getElementById("answer").textContent = answer;
}
}
}
}
}
两种方式,各有取舍:EventSource 省心,但只能 GET、不能带头;fetch + ReadableStream 灵活,支持 POST 和自定义头,但要自己解析 SSE 格式。简单场景用前者,需要鉴权、传复杂参数就用后者。到这里,一条从模型到用户屏幕的完整流式链路就通了。但要把它真正用在生产上,还有几个绕不开的工程坑。
六、工程坑:中断、错误处理与首字延迟
流式链路通了,但有几个工程坑,不处理就会在生产上出事。坑 1:用户中途关页面,后端要能感知并停止。流式响应往往持续十几秒,这期间用户完全可能关掉页面或点了"停止"。如果后端察觉不到,它会继续调模型、继续生成——而这些生成没人要了,纯属烧钱。后端要能感知连接已断开,一旦断开就停止迭代模型流:
@app.route("/chat/stream")
def chat_stream_safe():
question = request.args.get("q", "")
def event_generator():
try:
for delta in stream_from_model(question):
payload = json.dumps({"content": delta},
ensure_ascii=False)
yield f"data: {payload}\n\n"
yield 'data: {"done": true}\n\n'
except GeneratorExit:
# 客户端断开连接时,这个生成器会被关闭,抛出 GeneratorExit。
# 捕获它 = 知道"用户不要了",就此打住,别再调模型烧钱。
print("客户端已断开,停止生成")
raise
except Exception as e:
# 坑 2:流【已经开始】之后才出错,要把错误也当成一条消息发出去
err = json.dumps({"error": str(e)}, ensure_ascii=False)
yield f"data: {err}\n\n"
return Response(event_generator(), mimetype="text/event-stream")
坑 2:流中途出错,不能简单抛 500。这是流式特有的难点。普通接口出错,你可以直接返回 HTTP 500。但流式响应,在出错时 HTTP 状态码早已发出去了(在第一个 chunk 发出时,状态码就是 200 了),你没法再改成 500。所以流开始之后的错误,只能当成流里的一条特殊消息发给前端(上面的 {"error": ...}),由前端识别这条消息并做相应提示。前端的解析逻辑也要相应地处理这个 error 字段:
function handleSSEData(raw, onText, onError, onDone) {
// 前端统一处理一条 SSE 消息:正常内容 / 错误 / 结束,分别对待
const data = JSON.parse(raw);
if (data.error) { // 流中途出错:模型超时、内容被拦截等
onError(data.error); // 给用户一个明确的失败提示
} else if (data.done) { // 收到结束信号
onDone();
} else {
onText(data.content); // 正常的一小段内容,追加渲染
}
}
坑 3:首字延迟,是流式体验的生命线。流式的全部价值,就在于首字延迟短。所以任何挡在"第一个字"前面的耗时操作都要警惕:比如你在调模型之前,先做了一次耗时的数据库查询或 RAG 检索——这些时间会全部加到首字延迟上,流式的优势就被抵消了。能提前做的别拖到请求里,能并行的别串行。坑 4:别忘了把缓冲关掉。有些 Web 服务器或反向代理(如 Nginx)默认会缓冲响应——它会攒一批数据再一起发,这会让你精心设计的流式又变回"一坨一坨"。要在响应头里加 X-Accel-Buffering: no 之类的设置,显式关掉缓冲。坑 5:完整答案要在服务端自己拼。如果你需要把这次对话存进数据库,记住模型是分块发的,前端拿到的是一段段碎片,后端不能依赖前端回传完整答案。正确做法是后端一边推流、一边把每块 delta 拼接成完整答案,流正常结束后再落库:
@app.route("/chat/stream")
def chat_stream_persist():
question = request.args.get("q", "")
def event_generator():
parts = [] # 一边推流,一边把每块攒进来
try:
for delta in stream_from_model(question):
parts.append(delta) # 关键:服务端自己留一份
payload = json.dumps({"content": delta},
ensure_ascii=False)
yield f"data: {payload}\n\n"
# 流【正常走完】才拼接落库 —— 中途断开就不存半截答案
full_answer = "".join(parts)
save_conversation(question, full_answer)
yield 'data: {"done": true}\n\n'
except GeneratorExit:
# 用户中途断开:已生成的半截答案要不要存,按业务定
raise
return Response(event_generator(), mimetype="text/event-stream")
下面这张图,把一次流式问答的完整链路串起来:
关键概念速查
| 概念 / 手段 | 说明 |
|---|---|
| 阻塞式返回的问题 | 等模型整段生成完才返回,扣住了早已生成的开头,用户对着空屏幕干等 |
| 流式输出 | 模型生成多少就立刻推多少,用户看到答案逐字往外冒的打字机效果 |
| 首字延迟 | 从请求到看到第一个字的时间,流式把它从十几秒压缩到几乎为零 |
| stream=True | 模型 API 开启流式,返回值变成可迭代的流,逐块拿到生成内容 |
| delta 是增量 | 每个 chunk 的 delta.content 是新增文字不是全部,要依次拼接,且可能为 None |
| SSE 服务端推送 | 一个连接持续往客户端写数据,格式是 data 开头两个换行结尾 |
| 结束信号 | 流式没有天然的响应结束标志,要自己约定一条 done 消息显式告知 |
| EventSource | 浏览器原生 SSE 客户端,简单但只支持 GET、不能自定义请求头 |
| fetch 读流 | fetch 配合 ReadableStream 手动读流,支持 POST 和自定义头但要自己解析 |
| 流中途出错 | 状态码已发出无法改 500,只能把错误当成流里一条特殊消息发给前端 |
避坑清单
- 别用阻塞式等模型整段生成完再返回,这会扣住早已生成的开头,让用户对着空屏幕干等十几秒。
- 流式输出不会缩短总生成时间,但能把首字延迟从十几秒压到几乎为零,大幅改善体验感受。
- 模型 API 加 stream=True 即开启流式,返回值变成可迭代的流,用 for 循环逐块取。
- 每个 chunk 的 delta.content 是新增增量不是全部,要依次拼接,且它可能为 None 必须判空。
- 用 SSE 推流,响应 mimetype 必须是 text/event-stream,消息格式是 data 开头两个换行结尾。
- 流式没有天然的结束标志,后端必须额外发一条 done 消息,前端才知道是真发完了。
- 前端简单场景用 EventSource,需要 POST 或自定义请求头则用 fetch 配合 ReadableStream。
- 用户中途关页面,后端要捕获 GeneratorExit 感知断开并停止生成,否则会继续调模型烧钱。
- 流开始后才出错无法再返回 500,只能把错误当成流里一条特殊消息发出,由前端识别提示。
- 首字延迟是流式的生命线,别在调模型前堆耗时操作,并关掉 Nginx 等代理的响应缓冲。
总结
回头看那次"用户对着空白屏幕等了十几秒、以为卡死"的事故,以及我后来在流式上接连踩的坑,最该记住的不是某一段 SSE 代码,而是我动手前那个想当然的判断——"等模型生成完,一次性返回就行"。这句话错在它把"生成一段答案"看成了一个不可分割的、瞬间的整体。可它根本不是。一段长答案的生成,是一个有过程、有先后、持续十几秒的事件——开头的字早早就生成好了,结尾的字很晚才出来。我的第一版,做的是一件很别扭的事:它明明在第 1 秒就握着答案的开头,却硬要把它攥在手里,陪着用户一起干等到第 15 秒,才一次性松手。流式输出想清楚的,正是这件事:既然内容是陆续生成出来的,那就陆续地交付出去——生成的节奏,就应该是交付的节奏。不要在中间设一个水库,把流动的水蓄起来,非等蓄满了才开闸放一次。
所以做流式,真正的工程量不在"stream=True 加上去"那一下。那一下,API 帮你做完了。真正的工程量,在于你意识到:流式改变的不只是一个参数,而是整条链路的"形态"。阻塞式是一问一答,流式是一问、多答、再有个结束。这个形态的改变,会顺着链路一路传导,逼着每个环节都跟着变:传输层,你不能再用普通响应,得用 SSE 这种能持续推的;协议上,你得自己造一个"结束信号",因为流没有天然的终点;错误处理上,你得接受一个反直觉的事实——流一旦开始,你就再没机会返回 500 了,出错只能混在流里说;连用户中途反悔这种事,你都得认真处理,否则就是对着空气烧钱。这篇文章的几节,其实就是顺着这条"形态改变"展开的:先想清楚阻塞式差在哪、流式的本质是什么,再看模型流式 API、SSE 推送、前端接收这三段主干,最后是中断、出错、首字延迟这几个把流式真正做对的工程细节。
你会发现,流式输出的思路,和现实里一个好的服务者怎么对待"让人等待"这件事,完全相通。你去一家餐厅,菜要做二十分钟——一个糟糕的餐厅,会让你对着空桌子干坐二十分钟,期间没有任何人理你,你不知道厨房到底在不在做你的菜,你越坐越慌。一个好的餐厅会怎么做?它会先给你上茶水、上小菜,会告诉你"您的菜在做了,大概还要十五分钟",会每道菜一好就先端上来。它并没有把那二十分钟变短——菜该做多久还是多久——但它让你每时每刻都知道"我没有被遗忘,事情在推进"。流式输出做的,就是这件事:它治不了"生成需要时间"这个客观事实,但它能治"等待时的那种被遗弃的焦虑"。技术产品的体验,很多时候就赢在这种地方——不是真的更快,而是让用户感觉到了被尊重、被告知。
最后想说,流式做没做扎实,差距永远不会在 Demo 里暴露——Demo 里你自己问一个简短的问题,模型两秒就答完了,有没有流式看起来差不多。它只在真实的、用户会问复杂问题、模型要生成长答案、网络还时好时坏的生产环境里才显形。那时候它会用最直接的方式给你结账:做不好,你会像我一样,看着用户对着空屏幕一脸茫然,看着他们不耐烦地反复刷新,看着跳出率居高不下——你的产品明明能力不差,却因为"看起来像卡死了"而被用户放弃。而做对了,用户点下"发送"的那一瞬间,答案就开始一个字一个字地、温和地往外流淌,他从第一秒起就清清楚楚地看到"系统在为我认真工作"——哪怕完整答案同样要等十几秒,这十几秒的感受,已经天差地别。所以别等用户"以为卡死了"的反馈找上门,在你写下第一行"调用大模型"的代码时就该想清楚:模型生成这段答案,要花多久?这段时间里,用户那边看得到反馈吗?他能分清"系统在干活"和"系统死了"吗?这几个问题都有了答案,你的 AI 产品才不只是 Demo 里那个答得出问题的样子,而是一个无论问题多复杂、答案多长,都能让用户从第一秒起就安心的可靠产品。
—— 别看了 · 2026