大模型流式输出完全指南:从"转圈圈"到打字机效果的工程实现

2024 年初我做了一个接入大模型的 AI 对话产品,第一版用最直白的方式:前端发普通 HTTP 请求,后端调大模型等【完整回答】生成好一次性返回。本地测短问题没事,一上线用户开始抱怨"卡"——问个复杂问题让模型写长解释,屏幕一片空白干等十几秒,我自己都怀疑服务挂了,直到第十几秒一整段几百字凭空出现。我忽然懂了那个"卡"不是真卡,是那十几秒纯空白等待让人本能以为程序死了。别家成熟 AI 产品的回答像打字机一样一个词一个词流出来,同样十几秒一个是对着空白干等、一个是看着答案一点点冒出来,体验天壤之别。梳理:大模型本就逐 token 生成,回答第一个字远在整段完成前就好了,方案 A 错在硬等全部生成完才返回、白白浪费了边生成边返回的天然可能;流式模式下决定体验的不再是总耗时而是首 token 延迟 TTFT。选型:轮询能模拟但笨(空跑浪费、有延迟),WebSocket 全双工对单向推场景过重,SSE 基于普通 HTTP 单向推浏览器原生支持、为此场景量身定做。后端当中转管道三步:stream=True 调大模型拿到可迭代的流、每收到一小块立刻按 data: 格式写出并 flush 千万别攒着、发 data:

2024 年初,我做了一个接入大模型的 AI 对话产品。第一版,我用的是最直白的方式:前端发一个普通的 HTTP 请求,后端调大模型的接口,等模型把【完整的回答】生成好,一次性返回,前端再渲染出来。本地测试时,问题不明显——我问的都是短问题,模型一两秒就答完了。可一上线,用户开始抱怨"卡"。我自己复现:问一个稍微复杂点的问题,让模型写一段长一点的解释。然后,我对着屏幕,开始等。一秒、两秒……五秒……屏幕上【一片空白】,什么都没有。十秒过去,我自己都开始怀疑是不是服务挂了。直到第十几秒,"啪"地一下,一整段几百字的回答,凭空出现在屏幕上。我盯着这个过程,忽然就懂了用户说的"卡"是什么意思:不是真的卡,是那十几秒的【纯空白等待】,让人本能地以为程序死了。我去翻了一下别家成熟的 AI 产品——它们没有一个是这样的。它们的回答,是像打字机一样,一个词一个词、平滑地"流"出来的;你几乎是在模型"想"出来的同时,就看到了它。同样十几秒,一个是"对着空白屏幕干等十几秒",一个是"看着答案一点点冒出来十几秒"——体验是天壤之别。这件事逼着我把大模型为什么天然适合流式输出、为什么流式要选 SSE 而不是别的、后端怎么把模型的流转发出去、前端怎么消费、以及工程上的那些坑,彻底理清了一遍。本文是这份梳理的完整复盘。

问题背景:一个"对着空白屏幕干等十几秒"的对话产品

环境:一个接入大模型的 AI 对话产品
我的方案 A(第一版):
- 前端发普通 HTTP 请求 -> 后端调大模型 -> 等【完整回答】生成完
  -> 一次性返回 -> 前端渲染
遇到的问题:
- ★ 短问题没事;问一个复杂问题,模型要生成长回答
- ★★ 用户对着【一片空白】的屏幕,干等十几秒 ——
     本能地以为"程序卡死了 / 服务挂了",大量流失
- ★ 别家成熟 AI 产品:回答像打字机一样一个词一个词"流"出来,
     模型刚"想"出来,你几乎同时就看到了

★★ 同样十几秒:一个是"对着空白屏幕干等",一个是"看着答案
   一点点冒出来" —— 用户的感知体验,天壤之别。

★ 关键事实:大模型本来就是【一个 token 一个 token】生成的。
  方案 A 的错,是【硬等它全部生成完】才返回 —— 白白浪费了
  "边生成边返回"的天然可能。

★ 本文要做的:把流式输出为什么必要、为什么选 SSE、后端
  前端怎么实现、以及工程上的坑,彻底讲透。

为什么要流式:大模型本就是"一个字一个字"蹦出来的

# === ★ 先认清一个事实:大模型是怎么"生成"回答的 ===

# === ★ 大模型的生成,是【逐 token】进行的 ===
# ★ ★ 大模型不是"一下子想出整段答案"。它的工作方式是:
#   预测下一个 token(可粗略理解为一个词 / 一个字),
#   把它接到已有文本后面,再预测下一个 —— 如此循环,
#   一个一个地往外蹦,直到生成结束。
# ★ ★ 也就是说:回答的【第一个字】,远在【整段回答】
#   完成之前,就已经生成好了。一段 300 字的回答,
#   可能要 10 秒;但第一个字,1 秒内就出来了。

# === ★★ 方案 A 错在哪:它把"已经能给的",憋着不给 ===
# ★ ★ 普通请求模式(方案 A):模型这边一个字一个字
#   生成着,但后端【死死等着】,非要等最后一个字也
#   生成完,才把整段一次性返回。
# ★ ★★ 这就浪费了一个巨大的、免费的优化空间:那第
#   1 秒就生成好的第一个字,本可以立刻送到用户眼前,
#   却被硬生生憋了 10 秒。用户那 10 秒的"空白焦虑",
#   完全是【人为制造】出来的。

# === ★ 流式输出:边生成,边返回 ===
# ★ ★ 流式(streaming)的思路特别朴素:模型每蹦出一
#   小块文字,后端就【立刻】把这一小块,顺着一个一直
#   开着的通道,推给前端;前端收到一块就渲染一块。
# ★ ★ 效果:用户在【第 1 秒】就看到了回答的开头,然后
#   看着它像打字机一样平滑地长出来。总时长没变,但
#   "可感知的等待",从十几秒,压缩到了一秒。

# === ★ 一个关键指标:首 token 延迟(TTFT)===
# ★ ★ 流式模式下,真正决定体验的,不再是"总耗时",
#   而是【首 token 延迟】(Time To First Token)——
#   从用户提问,到屏幕上冒出第一个字,隔了多久。
# ★ 优化流式体验,核心就是盯住并压低这个 TTFT。

# === 小结 ===
# ★ 大模型的生成是逐 token 进行的:它不是一下子想出整
#   段答案,而是预测下一个 token 接上去再预测下一个,
#   一个个往外蹦直到结束 —— 所以回答的第一个字远在
#   整段完成之前就生成好了,300 字回答要 10 秒但第一个
#   字 1 秒内就出来。★★ 方案 A 错在把"已经能给的"憋着
#   不给:模型一个字一个字生成着,后端却死等最后一个字
#   也完成才一次性返回,那第 1 秒就生成好的开头被硬憋了
#   10 秒,用户的空白焦虑完全是人为制造的。★ 流式输出
#   思路朴素:模型每蹦出一小块,后端立刻顺着一直开着的
#   通道推给前端,前端收一块渲一块,用户第 1 秒就看到
#   开头、看着它打字机般长出来,总时长没变但可感知等待
#   从十几秒压到一秒。★ 流式模式下决定体验的不再是总
#   耗时,而是首 token 延迟 TTFT(从提问到屏幕冒出第一
#   个字隔了多久),优化流式体验核心就是压低 TTFT。

选型:为什么是 SSE,而不是轮询或 WebSocket

# === ★ 流式要一个"服务器持续推、前端持续收"的通道 ===

# === ★ 候选 1:轮询(Polling)—— 能用,但很笨 ===
# ★ ★ 前端每隔几百毫秒,就发一个请求问后端"有新内容
#   吗"。能模拟出流式效果,但:① 大量请求是空跑,
#   浪费;② 推送有"轮询间隔"的延迟,不够实时;
#   ③ 前后端都要维护"进度"状态,实现繁琐。★ 不推荐。

# === ★ 候选 2:WebSocket —— 能用,但"杀鸡用牛刀" ===
# ★ ★ WebSocket 提供一条【全双工】通道,前后端都能
#   随时主动发消息。它很强,但对"大模型流式输出"这个
#   场景【过重】了。
# ★ ★ 想清楚这个场景的特点:数据流是【单向】的 ——
#   用户问一次,然后就只是【服务器单向地往下推】文字,
#   前端在这个过程中不需要再发什么。用全双工的
#   WebSocket,等于用了一个你根本用不上的能力,还要
#   承担它额外的协议复杂度、连接维护成本。

# === ★★ 候选 3:SSE —— 为这个场景量身定做 ===
# ★ ★ SSE(Server-Sent Events,服务器发送事件):基于
#   普通 HTTP 的一种机制,允许服务器在一个【一直不关闭】
#   的响应里,持续地、单向地向客户端推送数据。
# ★ ★★ 它和这个场景【严丝合缝】:正好是"服务器单向
#   推"。而且它的好处一大把:
#  - ★ 就是普通 HTTP,无需协议升级,天然穿透代理、防火墙;
#  - ★ 浏览器原生支持(EventSource API),自带断线重连;
#  - ★ 实现简单 —— 后端就是"别关闭响应,持续往里写"。
# ★ 结论:单向推送的流式场景,SSE 是性价比最高的选择。

# === ★ SSE 的数据格式:简单到一眼就懂 ===
# ★ ★ SSE 在 HTTP 响应体里,用一种极简的文本格式推数据。
#   每一条消息,就是一行 "data: 你的内容",后面跟一个
#   空行。响应头要带 Content-Type: text/event-stream。
# ★ 大模型流式,就是把每一小块文字,按这个格式一条条写出去。

# === 小结 ===
# ★ 流式需要一个"服务器持续推、前端持续收"的通道。
#   候选 1 轮询:前端每隔几百毫秒发请求问有没有新内容,
#   能模拟但大量请求空跑浪费、有轮询间隔延迟不够实时、
#   前后端都要维护进度状态繁琐,不推荐。候选 2 WebSocket:
#   提供全双工通道前后端都能随时主动发,但对大模型流式
#   过重 —— 这个场景数据流是单向的(用户问一次然后服务器
#   单向往下推,前端不需再发),用全双工等于用了根本用不上
#   的能力还要承担额外协议复杂度和连接维护成本。★★ 候选
#   3 SSE(Server-Sent Events):基于普通 HTTP,允许服务器
#   在一个一直不关闭的响应里持续单向推数据,和这个场景
#   严丝合缝,好处一大把 —— 就是普通 HTTP 无需协议升级
#   天然穿透代理防火墙、浏览器原生支持 EventSource 自带
#   断线重连、实现简单(后端就是别关响应持续往里写)。
# ★ SSE 数据格式极简:响应体里每条消息就是一行
#   "data: 内容"后跟一个空行,响应头带
#   Content-Type: text/event-stream;大模型流式就是把
#   每一小块文字按这个格式一条条写出去。

后端:从大模型流式拿数据,再用 SSE 转发

# === ★ 后端要做的:当一根"中转管道" ===

# === ★ 第 1 步:用流式模式调用大模型 ===
# ★ ★ 大模型的 API,基本都支持流式 —— 调用时把 stream
#   参数打开。这之后,API 返回的不再是"一个完整结果",
#   而是一个【可以逐块迭代】的流:你用一个循环去 for
#   它,每次循环,拿到模型刚生成的一小块文字。

# === ★ 第 2 步:把每一小块,按 SSE 格式立刻写出去 ===
# ★ ★ 关键就一个字:【快】。从模型那边每收到一小块,
#   就【立刻】把它包成 "data: ..." 格式,写进给前端的
#   响应里,然后【立刻 flush】(把缓冲区刷出去)。
# ★ ★★ 千万别"攒着":别想着"收集几块一起发",那等于
#   又把流式退化回了批量,首 token 延迟全毁了。

# === ★ 第 3 步:用一个明确的信号,告诉前端"说完了" ===
# ★ ★ 流是没有天然"结尾"的。所以约定一个【结束信号】
#   —— 比如发一条特殊的 "data: [DONE]"。前端收到它,
#   就知道这次回答彻底结束,可以收尾了。

# === ★ 后端的三个 HTTP 细节,缺一不可 ===
# ★ ★ ① 响应头 Content-Type 必须是 text/event-stream;
# ★ ★ ② 别让框架/服务器把响应"攒起来再发"—— 要确保
#   每写一块就真的发出去(很多框架要显式用流式响应类型);
# ★ ★★ ③ 当心反向代理:Nginx 这类代理默认会【缓冲】
#   后端响应(攒一大坨再转发),这会把你的流式彻底打回
#   原形。必须给这个接口关掉代理缓冲(后面"坑"里细说)。

# === 小结 ===
# ★ 后端的角色是一根中转管道,做三步。第 1 步用流式模式
#   调大模型:把 stream 参数打开,之后 API 返回的不再是
#   一个完整结果而是一个可逐块迭代的流,用循环 for 它每次
#   拿到模型刚生成的一小块文字。第 2 步把每一小块按 SSE
#   格式立刻写出去,关键就一个字快 —— 每收到一小块就立刻
#   包成 "data: ..." 写进响应再立刻 flush,★★ 千万别攒着
#   (攒几块一起发等于把流式退化回批量,首 token 延迟全
#   毁)。第 3 步用明确信号告诉前端说完了 —— 流没有天然
#   结尾,约定一个结束信号如 "data: [DONE]"。★ 三个 HTTP
#   细节缺一不可:① Content-Type 必须是 text/event-stream;
#   ② 别让框架/服务器把响应攒起来再发,确保每写一块就真
#   发出去;★★ ③ 当心反向代理,Nginx 默认会缓冲后端响应
#   (攒一大坨再转发)把流式打回原形,必须给这个接口关掉
#   代理缓冲。
# ★ 后端(FastAPI):从大模型流式拿数据,再用 SSE 转发给前端
import json
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from openai import OpenAI

app = FastAPI()
client = OpenAI()

def stream_answer(question):
    # ★ 第 1 步:用流式模式调用大模型 —— 关键是 stream=True
    completion = client.chat.completions.create(
        model='gpt-4o-mini',
        messages=[{'role': 'user', 'content': question}],
        stream=True,                       # ★ 打开流式,返回值变成可迭代的流
    )
    # ★ 第 2 步:每收到一小块,立刻按 SSE 格式 yield 出去
    for chunk in completion:
        delta = chunk.choices[0].delta.content
        if delta:
            # ★★ 用 JSON 包一层 —— delta 里可能含换行,裸写会破坏 SSE 行格式
            payload = json.dumps({'text': delta}, ensure_ascii=False)
            yield f'data: {payload}\n\n'   # ★ 一收到就发,绝不攒着
    # ★ 第 3 步:发一个明确的结束信号,告诉前端"说完了"
    yield 'data: [DONE]\n\n'

@app.get('/api/chat')
def chat(question: str):
    return StreamingResponse(
        stream_answer(question),
        media_type='text/event-stream',    # ★ 细节①:SSE 必须用这个 Content-Type
        headers={
            'Cache-Control': 'no-cache',
            'X-Accel-Buffering': 'no',     # ★★ 细节③:显式关掉 Nginx 代理缓冲
        },
    )

前端:消费 SSE 流,实现打字机效果

# === ★ 前端要做的:把后端推来的流,一块块"接住、画出来" ===

# === ★ 两种消费方式,先认清 ===
# ★ ★ 方式 1:浏览器原生的 EventSource API。它专门为 SSE
#   而生,自带断线重连,用起来最省心。★ 但它有个硬伤:
#   只支持 GET 请求,且不能自定义请求头 —— 一旦你的
#   接口要带 Authorization 头做鉴权,它就不好使了。
# ★ ★ 方式 2:用 fetch + ReadableStream 手动读流。它能
#   自定义请求方法和请求头,鉴权、POST 都没问题,代价
#   是断线重连要自己写。★ 实际产品里,因为几乎都要鉴权,
#   方式 2 反而是更常用的那个 —— 本文就用它。

# === ★ 核心循环:读一块、解析一块、渲染一块 ===
# ★ ★ fetch 拿到响应后,response.body 是一个可读流。你
#   从它身上拿一个 reader,然后写一个循环:每次 read()
#   出一小块数据,解析出里面的文字,【追加】到界面上
#   那段正在生长的回答里 —— 屏幕上就有了打字机效果。
# ★ ★ read() 返回的是字节,要用 TextDecoder 解码成文字。

# === ★★ 一个最容易踩的坑:网络分块 ≠ 消息分块 ===
# ★ ★ 你以为每次 read() 拿到的,正好是一条完整的
#   "data: ...\n\n"?★★ 不是。网络传输是按它自己的
#   节奏切块的:一次 read() 可能拿到半条消息,也可能
#   一次拿到两条半。你【不能】假设拿到的就是整条。
# ★ ★ 正解:维护一个【缓冲区(buffer)】。每次 read()
#   出的数据,先【拼接】到 buffer 末尾;然后在 buffer
#   里按分隔符(空行 \n\n)切出【已经完整】的那些消息
#   去处理;切剩下的【不完整尾巴】,留在 buffer 里,
#   等下一次 read() 的数据来补全。

# === ★ 收到结束信号,就收尾 ===
# ★ ★ 解析每条消息时,如果发现内容是约定的 [DONE],就
#   知道这次回答结束了 —— 跳出循环,做收尾(比如把
#   "正在输入"的光标隐藏掉)。

# === 小结 ===
# ★ 前端要把后端推来的流一块块接住画出来。两种消费方式:
#   方式 1 浏览器原生 EventSource 专为 SSE 而生自带断线
#   重连最省心,但硬伤是只支持 GET 且不能自定义请求头,
#   接口要带 Authorization 鉴权就不好使;方式 2 用 fetch
#   + ReadableStream 手动读流,能自定义方法和请求头鉴权
#   POST 都行,代价是断线重连自己写 —— 实际产品几乎都要
#   鉴权所以方式 2 更常用。★ 核心循环:fetch 拿到响应后
#   response.body 是可读流,拿一个 reader 写循环,每次
#   read() 出一小块、解析出文字、追加到界面那段正在生长
#   的回答里,屏幕就有了打字机效果;read() 返回字节要用
#   TextDecoder 解码。★★ 最容易踩的坑:网络分块≠消息分块
#   —— 一次 read() 可能拿到半条消息也可能两条半,不能
#   假设拿到的是整条;正解是维护一个缓冲区,每次 read()
#   的数据先拼到 buffer 末尾,再按空行切出已完整的消息
#   处理,不完整的尾巴留在 buffer 等下次补全。★ 解析时
#   发现内容是约定的 [DONE] 就知道结束,跳出循环做收尾。
// ★ 前端:用 fetch + ReadableStream 消费 SSE 流,实现打字机效果
async function streamChat(question, onToken, onDone) {
    // ★ 用 fetch 而非 EventSource —— 这样才能带请求头做鉴权、用 POST
    const resp = await fetch('/api/chat?question=' + encodeURIComponent(question), {
        headers: { 'Authorization': 'Bearer ' + getToken() },
    });

    // ★ resp.body 是一个可读流;拿一个 reader 来逐块读取
    const reader = resp.body.getReader();
    const decoder = new TextDecoder();
    // ★★ 关键:缓冲区。网络分块 != 消息分块,必须自己攒、自己切
    let buffer = '';

    while (true) {
        const { done, value } = await reader.read();
        if (done) break;                       // ★ 流读完了

        // ★ value 是字节,解码成文字,拼到缓冲区末尾
        buffer += decoder.decode(value, { stream: true });

        // ★★ 在缓冲区里按空行切出【已完整】的消息;不完整的尾巴留下
        const parts = buffer.split('\n\n');
        buffer = parts.pop();                  // ★ 最后一段可能不完整,留着等下次补

        for (const part of parts) {
            const line = part.trim();
            if (!line.startsWith('data:')) continue;
            const data = line.slice(5).trim();
            if (data === '[DONE]') {           // ★ 收到结束信号,收尾
                onDone();
                return;
            }
            // ★ 后端用 JSON 包了一层,这里解开,把这一小块文字交给界面
            const { text } = JSON.parse(data);
            onToken(text);                     // ★ 追加渲染 —— 打字机效果就来自这里
        }
    }
    onDone();
}

// ★ 用法:每来一小块就追加到回答区,流结束就隐藏"正在输入"光标
streamChat(
    userQuestion,
    (token) => { answerBox.textContent += token; },
    () => { cursor.style.display = 'none'; },
);

工程坑:错误处理、中断、Nginx 缓冲、计费

# === ★ 流式跑通不难,跑稳很难 —— 坑都在这一节 ===

# === ★★ 坑 1:流已经开始了,中途才出错,怎么办 ===
# ★ ★ 普通请求出错,你可以返回一个 500 状态码。但流式
#   不行:响应头(含状态码)在【第一个字节发出时】就
#   已经定死了,是 200。等你流到一半,模型那边报错了,
#   你【已经没法改状态码】了。
# ★ ★★ 正解:把错误,也当成一条【数据】,顺着同一个流
#   推下去 —— 比如发一条 "data: {"error": "..."}"。前端
#   解析时,认出这是错误消息,就停止打字、给用户报错。
#   错误处理,要走【数据通道】,不能再指望状态码。

# === ★ 坑 2:用户中途关了页面,后端别空转 ===
# ★ ★ 用户看了开头不满意,直接关掉页面 / 切走。此时前端
#   连接断了,但你的后端循环可能【还在傻傻地】问大模型
#   要 token —— 而这些 token,已经【没有任何人在等】了。
# ★ ★ 这是实打实的浪费:白烧大模型的钱、白占服务器的
#   连接。正解:在转发循环里,定期【检查客户端是否还
#   连着】,一旦发现断了,立刻 break,并停掉对大模型的
#   调用。

# === ★★ 坑 3:Nginx 缓冲 —— 流式最大的隐形杀手 ===
# ★ ★ 这个坑能让你"代码全对,流式却失效",排查到怀疑
#   人生。Nginx 等反向代理,默认开着 proxy_buffering ——
#   它会把后端的响应【先攒在自己这儿】,攒够一大坨,
#   再一次性转发给浏览器。
# ★ ★★ 后果:你后端明明一个字一个字地往外发,Nginx
#   却给你【攒成一整段】再放行。用户那边,流式体验
#   荡然无存,又变回了"对着空白屏幕干等"。
# ★ 解法:给这个 SSE 接口,在 Nginx 配置里关掉
#   proxy_buffering;后端再回一个 X-Accel-Buffering: no
#   响应头双保险(前面后端代码里已经加了)。

# === ★ 坑 4:流式下,token 用量怎么统计 ===
# ★ ★ 普通(非流式)调用,大模型会在最后的结果里,附上
#   这次用了多少 token。但流式模式下,默认的那些 chunk
#   里【没有】用量信息。
# ★ ★ 后果:你按 token 计费 / 做配额,会发现【统计不到】。
#   解法:调用大模型时,显式打开"在流末尾带上用量统计"
#   的选项(不同 API 选项名不同),拿到最后那个带 usage
#   的 chunk,再去记账。

# === ★ 坑 5:盯住 TTFT,而不是总耗时 ===
# ★ 前面说过,流式体验的核心指标是首 token 延迟(TTFT)。
#   ★ 监控上要专门埋一个点:从"收到用户请求"到"第一个
#   字节发给前端",这段时间是多少。它要是慢了,流式的
#   全部优势就被吃掉了 —— 用户照样在干等。

# === 认知 ===
# ★ 流式跑通不难跑稳很难。★★ 坑 1 流已开始中途才出错:
#   响应头含状态码在第一个字节发出时就定死是 200,流到
#   一半模型报错你已没法改状态码;正解是把错误也当成一条
#   数据顺着同一个流推下去(如 data: {"error":...}),前端
#   认出是错误消息就停止打字报错 —— 错误处理走数据通道
#   不能再指望状态码。★ 坑 2 用户中途关页面后端别空转:
#   前端连接断了后端循环可能还在傻傻问大模型要 token 而
#   这些 token 没人等了,白烧钱白占连接;正解是转发循环
#   里定期检查客户端是否还连着,断了立刻 break 并停掉对
#   大模型的调用。★★ 坑 3 Nginx 缓冲是流式最大的隐形
#   杀手:反向代理默认开 proxy_buffering 会把后端响应先
#   攒一大坨再一次性转发,后果是你一个字一个字发它给你
#   攒成一整段,流式体验荡然无存;解法是给 SSE 接口关掉
#   proxy_buffering,后端再回 X-Accel-Buffering: no 双
#   保险。★ 坑 4 流式下 token 用量默认 chunk 里没有,
#   按 token 计费会统计不到,要显式打开"流末尾带用量
#   统计"的选项拿到带 usage 的 chunk 再记账。★ 坑 5 盯
#   住 TTFT 而不是总耗时,埋点监控从收到请求到第一个
#   字节发给前端的时间,它慢了流式优势就被吃掉。
# ★ 后端加固版:处理"中途出错"和"客户端断开"两个核心工程坑
import json
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
from openai import OpenAI

app = FastAPI()
client = OpenAI()

async def safe_stream(question: str, request: Request):
    try:
        completion = client.chat.completions.create(
            model='gpt-4o-mini',
            messages=[{'role': 'user', 'content': question}],
            stream=True,
            stream_options={'include_usage': True},  # ★ 坑4:让流末尾带上 token 用量
        )
        for chunk in completion:
            # ★★ 坑2:每转发一块前,先看客户端还连着没 —— 断了就别再空转
            if await request.is_disconnected():
                print('客户端已断开,停止转发,不再白烧大模型的钱')
                break

            if not chunk.choices:
                # ★ 坑4:这种 chunk 不带文字,只带 usage —— 在这里记账
                if chunk.usage:
                    record_token_usage(chunk.usage.total_tokens)
                continue

            delta = chunk.choices[0].delta.content
            if delta:
                payload = json.dumps({'text': delta}, ensure_ascii=False)
                yield f'data: {payload}\n\n'
        yield 'data: [DONE]\n\n'

    except Exception as e:
        # ★★ 坑1:流已经开始,状态码改不了(早已是 200)——
        #    只能把错误当成一条【数据】,顺着同一个流推给前端
        err = json.dumps({'error': str(e)}, ensure_ascii=False)
        yield f'data: {err}\n\n'

@app.get('/api/chat')
async def chat(question: str, request: Request):
    return StreamingResponse(
        safe_stream(question, request),
        media_type='text/event-stream',
        headers={
            'Cache-Control': 'no-cache',
            'X-Accel-Buffering': 'no',     # ★ 坑3:后端侧关代理缓冲(还需在 Nginx 配置里关)
        },
    )

命令速查

大模型流式输出:为什么 + 怎么做
=============================================================
为什么要流式   大模型本就逐 token 生成,第一个字远在整段
              完成前就好了;方案A硬等全部生成完才返回,
              白白制造了十几秒的"空白焦虑"
核心指标       TTFT 首 token 延迟 —— 从提问到屏幕冒出第一
              个字隔了多久,流式优化就是盯住并压低它

选型:为什么是 SSE
-------------------------------------------------------------
轮询           前端每隔几百毫秒问一次,空跑浪费、有延迟,笨
WebSocket      全双工,但这个场景是单向推,杀鸡用牛刀
SSE            基于普通 HTTP、单向推、浏览器原生支持 ——
              和"服务器单向推文字"严丝合缝,首选

后端三步 + 三个 HTTP 细节
-------------------------------------------------------------
第1步   stream=True 调大模型,返回值变成可迭代的流
第2步   每收到一小块,立刻按 "data: ..." 格式写出并 flush
第3步   发结束信号 data: [DONE] 告诉前端"说完了"
细节①   Content-Type 必须是 text/event-stream
细节②   别让框架把响应攒起来再发(用流式响应类型)
细节③   关掉 Nginx 代理缓冲(X-Accel-Buffering: no)

前端 + 四个工程坑
-------------------------------------------------------------
前端     fetch 读 ReadableStream,维护缓冲区按空行切消息
        (网络分块 != 消息分块,不能假设拿到的是整条)
坑1     流已开始状态码改不了 -> 错误走数据通道 data: error
坑2     客户端断开 -> 循环里查 is_disconnected,断了就停
坑3     Nginx proxy_buffering -> 必须关,否则流式失效
坑4     流式默认无 token 用量 -> 显式开 include_usage

口诀:大模型本就逐 token 生成,别硬等它全部完成才返回
      单向推文字的场景,SSE 是性价比最高的选择
      盯住 TTFT,而不是总耗时

避坑清单

  1. 大模型本就是逐 token 生成的,回答的第一个字远在整段完成之前就好了,方案 A 硬等全部生成完才返回是白白浪费了"边生成边返回"的天然可能
  2. 流式模式下决定体验的不再是总耗时,而是首 token 延迟 TTFT(从提问到屏幕冒出第一个字),优化流式核心就是盯住并压低 TTFT
  3. 轮询能模拟流式但很笨(大量请求空跑、有轮询间隔延迟);WebSocket 是全双工对"服务器单向推"这个场景过重,SSE 才是为此场景量身定做的
  4. 后端转发时关键就一个字"快":每收到一小块就立刻写出并 flush,千万别"攒几块一起发",那等于把流式退化回了批量、首 token 延迟全毁
  5. SSE 响应的 Content-Type 必须是 text/event-stream,且要约定一个明确的结束信号(如 data: [DONE]),否则前端不知道流何时结束
  6. delta 文字里可能含换行,裸写进 "data: " 会破坏 SSE 的行格式,必须用 JSON 包一层再写出去
  7. 前端消费 SSE 时网络分块 != 消息分块,一次 read() 可能拿到半条或两条半,必须维护缓冲区按空行切出完整消息、不完整的尾巴留着等下次
  8. 流式响应头(含状态码)在第一个字节发出时就定死是 200,流到一半出错已改不了状态码,错误必须当成一条数据顺着流推下去
  9. 用户中途关页面后,后端转发循环要定期检查客户端是否还连着,断了立刻停掉对大模型的调用,否则白烧 token 的钱、白占连接
  10. Nginx 等反向代理默认开 proxy_buffering 会把响应攒成一大坨再转发、让流式彻底失效,必须给 SSE 接口关掉它(后端再回 X-Accel-Buffering: no 双保险)

总结

这一趟把流式输出彻底理清的过程,纠正了我一个特别根深蒂固的工程习惯——我总把"返回结果"这件事,理解成一个【在终点发生的、一次性的】动作。在我过去做过的几乎所有接口里,逻辑都是这样的:接收请求,吭哧吭哧把活干完,然后,在最后一刻,把那个【完整的、成品的】结果,一次性交出去。这个"干完→再交付"的顺序,我从没觉得有什么问题——结果当然要等它"完成"了才能给,没完成的半成品,交出去干嘛?可大模型这个场景,结结实实地把这个习惯打碎了。我盯着那个"对着空白屏幕干等十几秒"的产品,终于看清了我那版方案 A 真正的错误:模型在第 1 秒就已经生成好了回答的第一个字,这个字,【完全可用】、完全可以立刻送到用户眼前;可我的代码,非要把它和后面那两百多个还没生成的字【捆在一起】,死死等到【最后一个字】也落定,才肯整体放行。我亲手把一个"第 1 秒就能交付的东西",憋成了"第 15 秒才交付"。用户那十几秒的焦虑,不是模型慢造成的,是我那个"必须完整才交付"的执念造成的。想明白这件事,我意识到我真正要修正的,是把"完成"和"可用"这两个词,长久地、想当然地划了等号。它们【不是一回事】。一份回答,在它【全部】生成完之前,它的【前面一部分】,早就已经处于"可用"状态了。流式输出的全部精髓,朴素得令人惭愧:一份成果里,哪一部分先就绪了,就让哪一部分先去发挥它的价值,不要让它陪着还没就绪的其它部分一起空等。价值,不是在"全部完成"那一刻一次性产生的;它是【随着每一小部分的就绪,一点一点、持续不断地】产生的——你越早把已经就绪的那部分交付出去,价值就越早开始累积。这个道理想通了,我发现它早就溢出了"流式输出"这个技术点本身。它让我重新打量我做事的方式:一份要写一周的文档,我是非要等它"全部完美"了才给同事看,还是写完一个章节就先发出去、让它先开始被讨论被使用?一个要做一个月的功能,我是攒一个月再一次性上线,还是把它拆成能独立交付的小块、做完一块就上线一块?这背后,是同一个我过去一直没想透的道理:不要用"还没完成的整体",去拖累"已经可用的部分"。真正成熟的交付观,不是追求那个"啪"地一下、完整登场的、惊艳的终点;而是想清楚——我手里这件事,有哪一部分【现在、立刻】就已经能交付了?然后,就把它交付出去。然后是下一部分,再下一部分。把"在终点一次性交付",换成"尽早地、持续地,交付每一个已经就绪的部分"——这是大模型的流式输出教给我的,也是我打算往后一直带着走的一件事。

—— 别看了 · 2026
声明:本站所有文章,如无特殊说明或标注,均为本站原创发布。任何个人或组织,在未征得本站同意时,禁止复制、盗用、采集、发布本站内容到任何网站、书籍等各类媒体平台。如若本站内容侵犯了原著者的合法权益,可联系我们进行处理 邮箱1846861578@qq.com。
技术教程

数据库连接池完全指南:连接池耗尽是如何拖垮整个服务的

2026-5-21 12:59:51

技术教程

Docker 镜像优化完全指南:1.4GB 镜像是如何瘦到 80MB 的

2026-5-21 13:16:08

0 条回复 A文章作者 M管理员
    暂无讨论,说说你的看法吧
个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索