2023 年我做一个 AI 对话产品:用户输入一个问题,后端调大模型,把回答返回到前端展示。第一版我的写法是最直觉的那种:后端收到问题,调模型的接口,等模型把整段回答生成完,拿到完整文本,一次性返回给前端。功能上完全没问题——本地测试,问个简单问题,一两秒就出结果,挺好。可一上线、一问稍微复杂点的问题,体验就糟糕得让我自己都用不下去:用户点了发送,然后屏幕上什么都没有——一个空白的、转着圈的加载状态,持续十几秒,直到模型把三四百字的回答全部生成完,才"啪"地一下整段蹦出来。这十几秒里,用户完全不知道发生了什么:是卡住了?是没网?是这个产品坏了?很多人没等到回答出来,就已经划走了。我一开始以为是模型太慢,想换个更快的模型,可问题的本质不在快慢——大模型生成长文本,本来就是需要时间的,生成三四百字花十几秒,是它的正常速度。我真正错的地方,是我让用户对着这十几秒的过程完全无感。后来我才彻底想明白:你去看 ChatGPT、看任何一个体验好的 AI 产品,它们的回答都不是"等全部生成完再一次性出现"的,而是一个字一个字、像打字一样实时蹦出来的。这不是什么花哨的动画效果——它的背后,是大模型本来就是逐 token 生成的这个事实:模型不是先在内部憋出一整段话再给你,它是一个 token 一个 token 往外吐的。我的第一版代码,是硬生生地等它吐完所有 token、攒成一整段,才返回——白白把"边生成边展示"的机会浪费掉了。要让用户像看 ChatGPT 那样,生成到哪就看到哪,需要的是流式输出:模型每吐出一小段,后端就立刻把这一小段推给前端,前端收到就立刻显示。我以为流式不过是"把返回改成一段一段的",结果真做下来坑一个接一个:用什么协议推、后端接口怎么写、前端怎么接、流到一半出错了怎么办、用户中途关了页面怎么办……那次之后我才认真把流式输出从头搞明白。这篇文章就把它梳理一遍:为什么需要流式、模型为什么本就是逐 token 生成的、SSE 协议怎么回事、后端流式接口怎么写、前端怎么接收,以及把流式输出真正做好要避开的那些坑。
问题背景
先把那次的现象和我的误判讲清楚,后面所有的设计都是冲着纠正这个误判去的。
现象:一个 AI 对话产品,后端"等模型生成完整回答,再一次性返回前端"。问题稍复杂时,模型要十几秒才能生成完,这十几秒里用户对着一个空白加载界面干等,完全不知道进展,大量用户没等到结果就流失了。
我当时的错误认知:"接口就该是'请求—等待—拿到完整结果'。模型慢,那就换个快的模型。"
真相:大模型生成长文本本来就需要时间,这是它的固有特性,换模型只能缓解不能根治。但大模型有一个关键事实被忽略了——它是逐 token 生成的,内容是一点一点产出的,不是憋到最后一次性产出的。"等它全部生成完再返回",等于主动放弃了"边生成边展示"的可能。正确的做法是流式输出:模型每产出一小段,后端就立刻通过一个支持持续推送的通道(通常是 SSE)把它送给前端,前端收到就立刻渲染。用户看到的就从"干等十几秒再看到全部",变成"立刻开始、一个字一个字地看到回答生长出来"。
要把流式输出做好,需要几块认知:
- 为什么"等生成完再返回"体验差,流式到底改变了什么;
- 为什么大模型天然支持流式——它本就是逐 token 生成的;
- SSE 协议是什么,它和普通 HTTP 响应有何不同;
- 后端流式接口、前端流式接收分别怎么写;
- 流式下的错误处理、客户端取消、完整文本收集这些工程坑怎么处理。
一、为什么需要流式:让用户看到"正在生成"
先看清"等生成完再返回"这种模式,差在哪。
它差的不是总耗时——模型生成三四百字要十几秒,流式不流式,这十几秒都省不掉。它差的是用户的感知。非流式模式下,用户经历的是"十几秒纯粹的空白 + 末尾瞬间出现全部";流式模式下,用户经历的是"几百毫秒后第一个字出现,然后回答持续地、可见地生长"。同样是十几秒,前者让人焦虑、怀疑、想放弃,后者让人安心——因为用户全程都能确认"它在工作、在出东西"。在 AI 产品里,这个差别几乎是决定生死的。下面这段代码,就是那个"干等"模式:
from openai import OpenAI
client = OpenAI()
def answer_blocking(question: str) -> str:
# 反面教材:等模型把整段话【全部】生成完,才一次性返回。
resp = client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": question}],
)
return resp.choices[0].message.content
# 问题:模型生成 300 字可能要十几秒。这十几秒里,这个函数
# 一直卡在 create() 这一行不返回,前端只能对着空白屏幕干等,
# 一个字都看不到 —— 哪怕模型其实早就生成出前几十个字了。
这段代码的问题,不是它写错了,而是它主动丢掉了一个本来就有的机会。模型在生成第 5 秒时,其实早就产出了回答的前一两百字——这些内容本可以立刻给用户看,可 answer_blocking 把它们全憋在 create() 里,非要等最后一个字也生成完才一起放出来。要利用上这个机会,得先理解一件事:模型的内容,到底是怎么产出来的。
二、模型逐 token 生成:流式不是改造,是顺其自然
很多人以为流式输出是给模型"加"的一个功能,需要某种特殊改造。其实恰恰相反——逐 token 产出,是大模型本来的工作方式。流式不是改造它,而是顺着它本来的样子,不去硬憋。
大模型生成文本的过程,本质是一个 token 接一个 token 地预测:它根据已有的内容,预测下一个最可能的 token,吐出来;再把这个新 token 加进去,预测再下一个……如此循环,直到生成结束。也就是说,模型在生成第 100 个 token 时,前 99 个 token 早就已经产出了。"等生成完再返回"那种模式,是 API 客户端在背后替你把这些 token 一个个接住、攒起来,等齐了才一次性交给你——这个"攒"的动作,是客户端做的,不是模型的限制。
所以开启流式,你要做的只是告诉 API:别替我攒了,每来一个 token(或一小段),就立刻交给我。在 OpenAI 的接口里,这只是一个 stream=True 参数:
def answer_stream(question: str):
"""开启流式:模型每生成一小段,就立刻拿到一个 chunk。"""
stream = client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": question}],
stream=True, # 关键:开启流式,不再等全部生成完
)
# stream 是一个可迭代对象,模型每吐一小段,这个循环就转一次
for chunk in stream:
delta = chunk.choices[0].delta.content
if delta: # delta 有时是 None(比如首尾),要判断
yield delta # 每拿到一小段,就立刻往外吐
注意这个函数返回的不再是一个完整字符串,而是一个生成器——它自己也是"流式"的:每从模型那拿到一小段(delta),就 yield 出去一段。调用方可以一边迭代它、一边处理,根本不用等它跑完。模型这一端的流式有了,接下来的问题是:怎么把这个"一段一段往外吐"的流,透过 HTTP 传到前端?普通的 HTTP 响应是"一次性发完整个 body"的,装不了这种"持续地、一段一段发"的内容。这就要用到 SSE。
三、SSE:让服务端能持续往客户端推数据
普通的 HTTP 请求-响应是"一问一答":客户端发一个请求,服务端把完整的响应体一次性发回去,这次交互就结束了。它天生不适合"服务端持续地、分多次往客户端推送"的场景。
SSE(Server-Sent Events,服务端推送事件)就是为这个场景设计的。它本质上还是一个 HTTP 响应,但有两个关键不同:其一,响应头里 Content-Type 是 text/event-stream,告诉客户端"这是一个会持续推的流,别等它结束";其二,这个响应体不是一次性发完,而是分成一条一条的"消息",服务端可以在连接保持打开的状态下,一条接一条地往外发。SSE 的消息格式极简单:每条消息以 data: 开头,以一个空行(两个换行符)结尾。
import json
def to_sse(payload: dict) -> str:
"""把一个数据包装成一条标准的 SSE 消息。"""
text = json.dumps(payload, ensure_ascii=False)
# SSE 规定:消息以 "data: " 开头,以两个换行符结尾。
# 内容用 json 包一层,可以稳妥地容纳换行、特殊字符。
return f"data: {text}\n\n"
# 流结束时,约定发一条特殊消息,明确告诉前端"推完了"。
SSE_DONE = "data: [DONE]\n\n"
为什么用 SSE,而不是大家更熟的 WebSocket?因为这个场景是单向的——只需要服务端往客户端推,客户端不需要在同一个连接上往回说话。SSE 正是为"服务端单向推送"设计的,它就是普通 HTTP,不需要协议升级,实现简单、和现有的 HTTP 基础设施(网关、鉴权)天然兼容。WebSocket 是双向的,功能更强,但对这个单向场景属于"杀鸡用牛刀"。有了 SSE 这个传输形式,后端的流式接口就能写出来了。
四、后端流式接口:把模型的流接到 SSE 上
后端要做的事,就是把第二节那个"逐段吐出的生成器",和第三节那个"SSE 消息格式",接到一起:每从模型生成器拿到一小段,就把它包装成一条 SSE 消息发出去;模型流结束了,就发一条 [DONE]。Web 框架通常提供一个"流式响应"对象来承载它,这里用 FastAPI 的 StreamingResponse:
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
app = FastAPI()
@app.get("/chat")
def chat(question: str):
"""流式接口:边收模型的输出,边用 SSE 往前端推。"""
def event_stream():
# 迭代模型生成器:每拿到一小段,立刻包成 SSE 消息 yield 出去
for delta in answer_stream(question):
yield to_sse({"delta": delta})
# 模型流结束,补一条 [DONE] 让前端知道可以收尾了
yield SSE_DONE
return StreamingResponse(
event_stream(),
media_type="text/event-stream", # 关键:声明这是 SSE 流
)
这段代码的精髓,在 event_stream 是个生成器,而 StreamingResponse 会边迭代它、边把 yield 出来的东西发给客户端。所以整条链路是完全流式打通的:模型吐出一小段 → answer_stream 把它 yield 出来 → event_stream 把它包成 SSE 消息 yield 出去 → StreamingResponse 立刻把这条消息发给前端。中间没有任何一个环节在"攒"。模型生成到第几个字,前端就几乎同步地收到第几个字。和 answer_blocking 那种"全程憋着、最后一次性给"对比,差别一目了然。后端的流通了,最后一步是前端怎么接。
五、前端接收流:边读边渲染
前端接收 SSE 流,要点是不能等响应结束再读——这个响应根本不会"结束",它是持续推的。要用可读流的方式,读一段、处理一段。现代浏览器的 fetch 返回的响应体就是一个可读流,可以拿到一个 reader 持续地读:
async function streamChat(question) {
const resp = await fetch(`/chat?question=${encodeURIComponent(question)}`);
const reader = resp.body.getReader(); // 拿到响应体的可读流
const decoder = new TextDecoder();
let buffer = "";
while (true) {
const { done, value } = await reader.read(); // 读一小块
if (done) break; // 流真正结束了
buffer += decoder.decode(value, { stream: true });
// SSE 消息之间用空行(\n\n)分隔,按它切分逐条处理
const parts = buffer.split("\n\n");
buffer = parts.pop(); // 最后一段可能不完整,留到下次拼
for (const part of parts) {
if (!part.startsWith("data: ")) continue;
const payload = part.slice(6); // 去掉 "data: " 前缀
if (payload === "[DONE]") return; // 收到结束标记
const { delta } = JSON.parse(payload);
appendToScreen(delta); // 把这一小段【立刻】渲染到屏幕上
}
}
}
这段前端代码有个容易被忽略的关键点:网络传来的数据块,和 SSE 的"消息"边界,是不对齐的。你 reader.read() 读到的一块,可能包含两条半 SSE 消息,也可能只有半条。所以这里用一个 buffer 把读到的内容攒着,按 \n\n 切出完整的消息来处理,把最后那段可能不完整的留在 buffer 里、等下一块数据来了再拼。处理每条消息时,拿到 delta 就立刻 appendToScreen——这就是用户看到的"一个字一个字往外蹦"。至此,从模型到屏幕的整条流式链路就通了。但要把它做稳,还有几个坑。
六、工程坑:流式下的错误、取消与完整文本
主干通了,但流式输出有几个非流式时根本不存在的工程坑。
坑 1:流已经开始推之后才出错,没法再改 HTTP 状态码。普通接口出错,你返回个 500 状态码就行。但流式接口,一旦第一个 delta 发出去,HTTP 状态码(200)和响应头早已发给客户端了——此刻模型那边突然报错(比如超时、触发内容审查),你没法再把 200 改成 500。唯一的办法,是在流的内部发一条特殊的"错误消息",让前端自己识别和处理:
def safe_event_stream(question: str):
"""流式里的错误处理:出错后只能在流内部传达,不能改状态码。"""
try:
for delta in answer_stream(question):
yield to_sse({"delta": delta})
except Exception as e:
# 此刻 HTTP 200 和响应头早已发出 —— 改不了状态码了。
# 只能在流里发一条 error 事件,约定好让前端识别并提示用户。
yield to_sse({"error": f"生成中断: {e}"})
finally:
yield SSE_DONE # 无论如何都补一条结束标记
坑 2:用户中途关掉页面,要停止生成,别再白烧 token。用户问了个问题,模型刚生成到一半,用户不耐烦把页面关了。如果你的后端不感知这件事,它会傻乎乎地把整段话生成完——而这些 token 是要花钱的,全白烧了。所以流式接口要检测客户端是否已断开,一旦断开,立刻停止迭代模型:
from fastapi import Request
@app.get("/chat2")
async def chat2(question: str, request: Request):
"""检测客户端断开:用户一关页面,立刻停止生成,不再浪费 token。"""
async def event_stream():
for delta in answer_stream(question):
# 每推一段前,先看客户端还在不在
if await request.is_disconnected():
break # 客户端走了,果断停止生成
yield to_sse({"delta": delta})
yield SSE_DONE
return StreamingResponse(event_stream(), media_type="text/event-stream")
坑 3:流式输出的同时,别忘了把完整文本攒下来。流式是"发完一段就不管了",但你的业务往往还需要那段完整的回答——要落库存进对话历史、要记日志、要做内容审查。所以后端在往外推每一段的同时,要自己也留一份,在流结束时拼成完整文本:
def answer_stream_collected(question: str):
"""边流式输出,边把完整文本攒下来 —— 落库、记历史都要用它。"""
pieces = []
stream = client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": question}],
stream=True,
)
for chunk in stream:
delta = chunk.choices[0].delta.content
if delta:
pieces.append(delta) # 自己留一份
yield delta # 同时往外推
full_text = "".join(pieces) # 流结束,这才是完整回答
save_conversation(question, full_text) # 落库、记历史
坑 4:注意中间网关、代理的缓冲。你后端代码写得再"流式",如果中间隔着一层会缓冲响应的网关或反向代理(比如 Nginx 默认会缓冲),那它会把你一段段发出去的内容先攒起来、攒够一批再转发——流式效果就被这层缓冲毁掉了,前端那边又变回"一顿一顿"甚至"一次性"出现。所以部署流式服务时,要确认链路上所有中间层都关掉了对这个接口的缓冲。下面这张图,把流式输出从模型到屏幕的完整链路串起来:
关键概念速查
| 概念 / 手段 | 说明 |
|---|---|
| 非流式的问题 | 等整段回答生成完才返回,用户对着空白干等十几秒,大量流失 |
| 逐 token 生成 | 模型本就是一个 token 接一个 token 产出的,流式是顺其自然 |
| stream=True | 告诉 API 别替你攒,每产出一小段就立刻交给你,返回一个可迭代流 |
| SSE | 服务端推送事件,基于普通 HTTP,服务端可持续分条推送消息 |
| SSE 消息格式 | 每条以 data: 开头、空行结尾;约定 [DONE] 标记流结束 |
| StreamingResponse | Web 框架的流式响应,边迭代生成器边把内容发给客户端 |
| 前端可读流接收 | 用 reader 边读边处理,buffer 攒数据按消息边界切分逐条渲染 |
| 流式错误处理 | 流开始后状态码已发出,出错只能在流内部发 error 事件传达 |
| 客户端断开检测 | 用户关页面后检测断开并停止生成,避免白烧 token |
| 网关缓冲 | 中间代理缓冲响应会毁掉流式效果,要关掉对该接口的缓冲 |
避坑清单
- 非流式"等生成完再返回"省不掉总耗时,却让用户全程对着空白干等,体验极差、流失严重。
- 大模型本就是逐 token 生成的,生成第 100 个时前 99 个早已产出,流式不是改造而是顺其自然。
- 开启流式只需 stream=True,返回的是可迭代流,每产出一小段就能立刻拿到并处理。
- 普通 HTTP 一次性发完整响应体,装不下持续推送;用 SSE,基于普通 HTTP 即可分条持续推。
- SSE 消息以 data: 开头、空行结尾;单向推送场景用 SSE 即可,不必上更重的 WebSocket。
- 后端用 StreamingResponse 承载生成器,整条链路不攒,模型生成到哪前端几乎同步收到哪。
- 前端必须边读边渲染:网络数据块和 SSE 消息边界不对齐,要用 buffer 按空行切出完整消息。
- 流式接口一旦开始推流,状态码和响应头已发出无法再改,出错只能在流内部发 error 事件。
- 要检测客户端断开,用户关页面后立刻停止生成,否则模型会把整段生成完、白白烧掉 token。
- 边推流边自己攒一份完整文本用于落库记历史;并确认中间网关代理没有缓冲毁掉流式效果。
总结
回头看那个"用户问完干等十几秒"的对话产品,以及我后来在流式输出上接连踩的坑,最该记住的不是某一段 SSE 的解析代码,而是我动手前那个想当然的判断——"接口就该是'请求、等待、拿到完整结果'"。这个模式对绝大多数接口都对,可它偏偏不适合大模型。因为大模型的输出有一个别的接口少见的特性:它耗时很长,但又是一点一点产出的。对这种"过程很慢、但过程本身有价值"的输出,硬套"等结果"的模式,就等于把那个缓慢却可见的生成过程,强行压缩成一个用户什么都看不到的黑盒。流式输出做的事,就是把这个黑盒重新打开,让用户看见里面正在发生什么。
所以做流式输出,真正的工程量不在"加一个 stream=True"那一下。开启流式、迭代 chunk,这部分 Demo 里谁都能跑通。真正的工程量在那条"流"经过的每一个环节:模型的流,怎么接到 HTTP 上(SSE)?后端怎么保证整条链路不"攒"、真正一段段往外发?前端怎么处理"网络块"和"消息块"不对齐的问题?流到一半出错,你怎么告诉前端——状态码已经发不出去了?用户关了页面,你的后端知道吗,还是在白烧 token?这篇文章的几节,其实就是顺着这条思路展开的:先想清楚非流式为什么体验差,再看模型为什么天然支持流式,然后是 SSE 协议、后端接口、前端接收这三段主干,最后是错误处理、客户端取消、完整文本收集这几个把流式输出真正做稳的工程细节。
你会发现,流式输出的思路和我们处理任何"耗时长、但有中间产出"的任务的工程经验都是相通的。下载一个大文件,我们会显示进度条,而不是让用户对着空白等到下载完;一个长流程的表单提交,我们会一步步显示"正在校验、正在提交、正在生成",而不是憋一个大转圈。它们的内核是同一个:当一件事注定要花不少时间,就别把这段时间做成一个用户无法感知的黑盒,要把过程本身暴露出来、可视化。大模型的回答就是这样一件事——它的生成过程,本身就是值得展示给用户看的东西。
最后想说,流式做没做扎实,差距永远不会在 Demo 里暴露——Demo 里你问个简单问题,一两秒就出结果,流式不流式几乎看不出区别。它只在真实的用户、真实的复杂问题、真实的长回答面前才显形。那时候非流式会用最难堪的方式给你结账:一个十几秒的空白加载界面,把一批又一批没耐心的用户挡在门外;一个流到一半因为模型超时而永远停住、却连个错误提示都给不出的对话;一笔又一笔花在"用户早就关了页面、模型还在傻傻生成"上的 token 账单。所以别等用户因为"这产品怎么一直转圈"而划走,在你写下第一个调模型的接口时就该想清楚:这个回答可能要生成多久?这段时间我让用户看到进展了吗?流到一半出错,我有办法告诉他吗?他要是中途走了,我的后端停得下来吗?这几个问题都有了答案,你的 AI 产品才不只是 Demo 里那个简单问题跑得通的样子,而是一个无论回答多长,用户都能第一时间看到它"正在生长"的、让人愿意等下去的可靠产品。
—— 别看了 · 2026