大模型流式输出完全指南:从一次"用户问完干等十几秒才看到回答"看懂 SSE 流式响应

2023 年我做一个 AI 对话产品,用户输入问题,后端调大模型,把回答返回前端展示。第一版我的写法最直觉:后端收到问题,调模型接口,等模型把整段回答生成完,拿到完整文本一次性返回前端。本地测一两秒出结果挺好,可一上线问稍复杂的问题体验就糟糕得我自己都用不下去:用户点发送后屏幕什么都没有,一个空白转圈的加载状态持续十几秒,直到三四百字回答全部生成完才整段蹦出来,这十几秒里用户完全不知道发生了什么,很多人没等到就划走了。我以为是模型太慢想换个快的,可问题本质不在快慢,大模型生成长文本本来就需要时间,我真正错的是让用户对这十几秒的过程完全无感。后来才想明白,体验好的 AI 产品回答都不是等全部生成完再一次性出现,而是一个字一个字像打字一样实时蹦出来,背后是大模型本来就逐 token 生成这个事实,我的第一版硬生生等它吐完所有 token 攒成一整段才返回,白白浪费了边生成边展示的机会。本文从头梳理:为什么非流式体验差、模型为什么本就逐 token 生成、SSE 协议怎么回事、后端流式接口怎么写、前端怎么用可读流边读边渲染,以及流式下出错没法改状态码、客户端中途断开要停止生成别白烧 token、边推流边攒完整文本落库、中间网关代理缓冲会毁掉流式效果这些工程坑。核心一句:流式输出就是把一个用户什么都看不到的十几秒黑盒重新打开,让用户看见回答正在一个字一个字地生长出来。

2023 年我做一个 AI 对话产品:用户输入一个问题,后端调大模型,把回答返回到前端展示。第一版我的写法是最直觉的那种:后端收到问题,调模型的接口,等模型把整段回答生成完,拿到完整文本,一次性返回给前端。功能上完全没问题——本地测试,问个简单问题,一两秒就出结果,挺好。可一上线、一问稍微复杂点的问题,体验就糟糕得让我自己都用不下去:用户点了发送,然后屏幕上什么都没有——一个空白的、转着圈的加载状态,持续十几秒,直到模型把三四百字的回答全部生成完,才"啪"地一下整段蹦出来。这十几秒里,用户完全不知道发生了什么:是卡住了?是没网?是这个产品坏了?很多人没等到回答出来,就已经划走了。我一开始以为是模型太慢,想换个更快的模型,可问题的本质不在快慢——大模型生成长文本,本来就是需要时间的,生成三四百字花十几秒,是它的正常速度。我真正错的地方,是我让用户对着这十几秒的过程完全无感。后来我才彻底想明白:你去看 ChatGPT、看任何一个体验好的 AI 产品,它们的回答都不是"等全部生成完再一次性出现"的,而是一个字一个字、像打字一样实时蹦出来的。这不是什么花哨的动画效果——它的背后,是大模型本来就是逐 token 生成的这个事实:模型不是先在内部憋出一整段话再给你,它是一个 token 一个 token 往外吐的。我的第一版代码,是硬生生地等它吐完所有 token、攒成一整段,才返回——白白把"边生成边展示"的机会浪费掉了。要让用户像看 ChatGPT 那样,生成到哪就看到哪,需要的是流式输出:模型每吐出一小段,后端就立刻把这一小段推给前端,前端收到就立刻显示。我以为流式不过是"把返回改成一段一段的",结果真做下来坑一个接一个:用什么协议推、后端接口怎么写、前端怎么接、流到一半出错了怎么办、用户中途关了页面怎么办……那次之后我才认真把流式输出从头搞明白。这篇文章就把它梳理一遍:为什么需要流式、模型为什么本就是逐 token 生成的、SSE 协议怎么回事、后端流式接口怎么写、前端怎么接收,以及把流式输出真正做好要避开的那些坑。

问题背景

先把那次的现象和我的误判讲清楚,后面所有的设计都是冲着纠正这个误判去的。

现象:一个 AI 对话产品,后端"等模型生成完整回答,再一次性返回前端"。问题稍复杂时,模型要十几秒才能生成完,这十几秒里用户对着一个空白加载界面干等,完全不知道进展,大量用户没等到结果就流失了。

我当时的错误认知:"接口就该是'请求—等待—拿到完整结果'。模型慢,那就换个快的模型。"

真相:大模型生成长文本本来就需要时间,这是它的固有特性,换模型只能缓解不能根治。但大模型有一个关键事实被忽略了——它是逐 token 生成的,内容是一点一点产出的,不是憋到最后一次性产出的。"等它全部生成完再返回",等于主动放弃了"边生成边展示"的可能。正确的做法是流式输出:模型每产出一小段,后端就立刻通过一个支持持续推送的通道(通常是 SSE)把它送给前端,前端收到就立刻渲染。用户看到的就从"干等十几秒再看到全部",变成"立刻开始、一个字一个字地看到回答生长出来"。

要把流式输出做好,需要几块认知:

  • 为什么"等生成完再返回"体验差,流式到底改变了什么;
  • 为什么大模型天然支持流式——它本就是逐 token 生成的;
  • SSE 协议是什么,它和普通 HTTP 响应有何不同;
  • 后端流式接口、前端流式接收分别怎么写;
  • 流式下的错误处理、客户端取消、完整文本收集这些工程坑怎么处理。

一、为什么需要流式:让用户看到"正在生成"

先看清"等生成完再返回"这种模式,差在哪。

它差的不是总耗时——模型生成三四百字要十几秒,流式不流式,这十几秒都省不掉。它差的是用户的感知。非流式模式下,用户经历的是"十几秒纯粹的空白 + 末尾瞬间出现全部";流式模式下,用户经历的是"几百毫秒后第一个字出现,然后回答持续地、可见地生长"。同样是十几秒,前者让人焦虑、怀疑、想放弃,后者让人安心——因为用户全程都能确认"它在工作、在出东西"。在 AI 产品里,这个差别几乎是决定生死的。下面这段代码,就是那个"干等"模式:

from openai import OpenAI

client = OpenAI()


def answer_blocking(question: str) -> str:
    # 反面教材:等模型把整段话【全部】生成完,才一次性返回。
    resp = client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[{"role": "user", "content": question}],
    )
    return resp.choices[0].message.content
    # 问题:模型生成 300 字可能要十几秒。这十几秒里,这个函数
    # 一直卡在 create() 这一行不返回,前端只能对着空白屏幕干等,
    # 一个字都看不到 —— 哪怕模型其实早就生成出前几十个字了。

这段代码的问题,不是它写错了,而是它主动丢掉了一个本来就有的机会。模型在生成第 5 秒时,其实早就产出了回答的前一两百字——这些内容本可以立刻给用户看,可 answer_blocking 把它们全憋在 create() 里,非要等最后一个字也生成完才一起放出来。要利用上这个机会,得先理解一件事:模型的内容,到底是怎么产出来的。

二、模型逐 token 生成:流式不是改造,是顺其自然

很多人以为流式输出是给模型"加"的一个功能,需要某种特殊改造。其实恰恰相反——逐 token 产出,是大模型本来的工作方式。流式不是改造它,而是顺着它本来的样子,不去硬憋。

大模型生成文本的过程,本质是一个 token 接一个 token 地预测:它根据已有的内容,预测下一个最可能的 token,吐出来;再把这个新 token 加进去,预测再下一个……如此循环,直到生成结束。也就是说,模型在生成第 100 个 token 时,前 99 个 token 早就已经产出了。"等生成完再返回"那种模式,是 API 客户端在背后替你把这些 token 一个个接住、攒起来,等齐了才一次性交给你——这个"攒"的动作,是客户端做的,不是模型的限制。

所以开启流式,你要做的只是告诉 API:别替我攒了,每来一个 token(或一小段),就立刻交给我。在 OpenAI 的接口里,这只是一个 stream=True 参数:

def answer_stream(question: str):
    """开启流式:模型每生成一小段,就立刻拿到一个 chunk。"""
    stream = client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[{"role": "user", "content": question}],
        stream=True,                  # 关键:开启流式,不再等全部生成完
    )
    # stream 是一个可迭代对象,模型每吐一小段,这个循环就转一次
    for chunk in stream:
        delta = chunk.choices[0].delta.content
        if delta:                     # delta 有时是 None(比如首尾),要判断
            yield delta               # 每拿到一小段,就立刻往外吐

注意这个函数返回的不再是一个完整字符串,而是一个生成器——它自己也是"流式"的:每从模型那拿到一小段(delta),就 yield 出去一段。调用方可以一边迭代它、一边处理,根本不用等它跑完。模型这一端的流式有了,接下来的问题是:怎么把这个"一段一段往外吐"的流,透过 HTTP 传到前端?普通的 HTTP 响应是"一次性发完整个 body"的,装不了这种"持续地、一段一段发"的内容。这就要用到 SSE。

三、SSE:让服务端能持续往客户端推数据

普通的 HTTP 请求-响应是"一问一答":客户端发一个请求,服务端把完整的响应体一次性发回去,这次交互就结束了。它天生不适合"服务端持续地、分多次往客户端推送"的场景。

SSE(Server-Sent Events,服务端推送事件)就是为这个场景设计的。它本质上还是一个 HTTP 响应,但有两个关键不同:其一,响应头里 Content-Typetext/event-stream,告诉客户端"这是一个会持续推的流,别等它结束";其二,这个响应体不是一次性发完,而是分成一条一条的"消息",服务端可以在连接保持打开的状态下,一条接一条地往外发。SSE 的消息格式极简单:每条消息以 data: 开头,以一个空行(两个换行符)结尾。

import json


def to_sse(payload: dict) -> str:
    """把一个数据包装成一条标准的 SSE 消息。"""
    text = json.dumps(payload, ensure_ascii=False)
    # SSE 规定:消息以 "data: " 开头,以两个换行符结尾。
    # 内容用 json 包一层,可以稳妥地容纳换行、特殊字符。
    return f"data: {text}\n\n"


# 流结束时,约定发一条特殊消息,明确告诉前端"推完了"。
SSE_DONE = "data: [DONE]\n\n"

为什么用 SSE,而不是大家更熟的 WebSocket?因为这个场景是单向的——只需要服务端往客户端推,客户端不需要在同一个连接上往回说话。SSE 正是为"服务端单向推送"设计的,它就是普通 HTTP,不需要协议升级,实现简单、和现有的 HTTP 基础设施(网关、鉴权)天然兼容。WebSocket 是双向的,功能更强,但对这个单向场景属于"杀鸡用牛刀"。有了 SSE 这个传输形式,后端的流式接口就能写出来了。

四、后端流式接口:把模型的流接到 SSE 上

后端要做的事,就是把第二节那个"逐段吐出的生成器",和第三节那个"SSE 消息格式",接到一起:每从模型生成器拿到一小段,就把它包装成一条 SSE 消息发出去;模型流结束了,就发一条 [DONE]。Web 框架通常提供一个"流式响应"对象来承载它,这里用 FastAPI 的 StreamingResponse:

from fastapi import FastAPI
from fastapi.responses import StreamingResponse

app = FastAPI()


@app.get("/chat")
def chat(question: str):
    """流式接口:边收模型的输出,边用 SSE 往前端推。"""

    def event_stream():
        # 迭代模型生成器:每拿到一小段,立刻包成 SSE 消息 yield 出去
        for delta in answer_stream(question):
            yield to_sse({"delta": delta})
        # 模型流结束,补一条 [DONE] 让前端知道可以收尾了
        yield SSE_DONE

    return StreamingResponse(
        event_stream(),
        media_type="text/event-stream",   # 关键:声明这是 SSE 流
    )

这段代码的精髓,在 event_stream 是个生成器,而 StreamingResponse边迭代它、边把 yield 出来的东西发给客户端。所以整条链路是完全流式打通的:模型吐出一小段 → answer_stream 把它 yield 出来 → event_stream 把它包成 SSE 消息 yield 出去 → StreamingResponse 立刻把这条消息发给前端。中间没有任何一个环节在"攒"。模型生成到第几个字,前端就几乎同步地收到第几个字。和 answer_blocking 那种"全程憋着、最后一次性给"对比,差别一目了然。后端的流通了,最后一步是前端怎么接。

五、前端接收流:边读边渲染

前端接收 SSE 流,要点是不能等响应结束再读——这个响应根本不会"结束",它是持续推的。要用可读流的方式,读一段、处理一段。现代浏览器的 fetch 返回的响应体就是一个可读流,可以拿到一个 reader 持续地读:

async function streamChat(question) {
  const resp = await fetch(`/chat?question=${encodeURIComponent(question)}`);
  const reader = resp.body.getReader();      // 拿到响应体的可读流
  const decoder = new TextDecoder();
  let buffer = "";

  while (true) {
    const { done, value } = await reader.read();   // 读一小块
    if (done) break;                               // 流真正结束了
    buffer += decoder.decode(value, { stream: true });

    // SSE 消息之间用空行(\n\n)分隔,按它切分逐条处理
    const parts = buffer.split("\n\n");
    buffer = parts.pop();              // 最后一段可能不完整,留到下次拼

    for (const part of parts) {
      if (!part.startsWith("data: ")) continue;
      const payload = part.slice(6);   // 去掉 "data: " 前缀
      if (payload === "[DONE]") return;            // 收到结束标记
      const { delta } = JSON.parse(payload);
      appendToScreen(delta);           // 把这一小段【立刻】渲染到屏幕上
    }
  }
}

这段前端代码有个容易被忽略的关键点:网络传来的数据块,和 SSE 的"消息"边界,是不对齐的。你 reader.read() 读到的一块,可能包含两条半 SSE 消息,也可能只有半条。所以这里用一个 buffer 把读到的内容攒着,按 \n\n 切出完整的消息来处理,把最后那段可能不完整的留在 buffer 里、等下一块数据来了再拼。处理每条消息时,拿到 delta立刻 appendToScreen——这就是用户看到的"一个字一个字往外蹦"。至此,从模型到屏幕的整条流式链路就通了。但要把它做,还有几个坑。

六、工程坑:流式下的错误、取消与完整文本

主干通了,但流式输出有几个非流式时根本不存在的工程坑。

坑 1:流已经开始推之后才出错,没法再改 HTTP 状态码。普通接口出错,你返回个 500 状态码就行。但流式接口,一旦第一个 delta 发出去,HTTP 状态码(200)和响应头早已发给客户端了——此刻模型那边突然报错(比如超时、触发内容审查),你没法再把 200 改成 500。唯一的办法,是在流的内部发一条特殊的"错误消息",让前端自己识别和处理:

def safe_event_stream(question: str):
    """流式里的错误处理:出错后只能在流内部传达,不能改状态码。"""
    try:
        for delta in answer_stream(question):
            yield to_sse({"delta": delta})
    except Exception as e:
        # 此刻 HTTP 200 和响应头早已发出 —— 改不了状态码了。
        # 只能在流里发一条 error 事件,约定好让前端识别并提示用户。
        yield to_sse({"error": f"生成中断: {e}"})
    finally:
        yield SSE_DONE                    # 无论如何都补一条结束标记

坑 2:用户中途关掉页面,要停止生成,别再白烧 token。用户问了个问题,模型刚生成到一半,用户不耐烦把页面关了。如果你的后端不感知这件事,它会傻乎乎地把整段话生成完——而这些 token 是要花钱的,全白烧了。所以流式接口要检测客户端是否已断开,一旦断开,立刻停止迭代模型:

from fastapi import Request


@app.get("/chat2")
async def chat2(question: str, request: Request):
    """检测客户端断开:用户一关页面,立刻停止生成,不再浪费 token。"""

    async def event_stream():
        for delta in answer_stream(question):
            # 每推一段前,先看客户端还在不在
            if await request.is_disconnected():
                break                     # 客户端走了,果断停止生成
            yield to_sse({"delta": delta})
        yield SSE_DONE

    return StreamingResponse(event_stream(), media_type="text/event-stream")

坑 3:流式输出的同时,别忘了把完整文本攒下来。流式是"发完一段就不管了",但你的业务往往还需要那段完整的回答——要落库存进对话历史、要记日志、要做内容审查。所以后端在往外推每一段的同时,要自己也留一份,在流结束时拼成完整文本:

def answer_stream_collected(question: str):
    """边流式输出,边把完整文本攒下来 —— 落库、记历史都要用它。"""
    pieces = []
    stream = client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[{"role": "user", "content": question}],
        stream=True,
    )
    for chunk in stream:
        delta = chunk.choices[0].delta.content
        if delta:
            pieces.append(delta)          # 自己留一份
            yield delta                   # 同时往外推
    full_text = "".join(pieces)           # 流结束,这才是完整回答
    save_conversation(question, full_text)   # 落库、记历史

坑 4:注意中间网关、代理的缓冲。你后端代码写得再"流式",如果中间隔着一层会缓冲响应的网关或反向代理(比如 Nginx 默认会缓冲),那它会把你一段段发出去的内容先攒起来、攒够一批再转发——流式效果就被这层缓冲毁掉了,前端那边又变回"一顿一顿"甚至"一次性"出现。所以部署流式服务时,要确认链路上所有中间层都关掉了对这个接口的缓冲。下面这张图,把流式输出从模型到屏幕的完整链路串起来:

关键概念速查

概念 / 手段 说明
非流式的问题 等整段回答生成完才返回,用户对着空白干等十几秒,大量流失
逐 token 生成 模型本就是一个 token 接一个 token 产出的,流式是顺其自然
stream=True 告诉 API 别替你攒,每产出一小段就立刻交给你,返回一个可迭代流
SSE 服务端推送事件,基于普通 HTTP,服务端可持续分条推送消息
SSE 消息格式 每条以 data: 开头、空行结尾;约定 [DONE] 标记流结束
StreamingResponse Web 框架的流式响应,边迭代生成器边把内容发给客户端
前端可读流接收 用 reader 边读边处理,buffer 攒数据按消息边界切分逐条渲染
流式错误处理 流开始后状态码已发出,出错只能在流内部发 error 事件传达
客户端断开检测 用户关页面后检测断开并停止生成,避免白烧 token
网关缓冲 中间代理缓冲响应会毁掉流式效果,要关掉对该接口的缓冲

避坑清单

  1. 非流式"等生成完再返回"省不掉总耗时,却让用户全程对着空白干等,体验极差、流失严重。
  2. 大模型本就是逐 token 生成的,生成第 100 个时前 99 个早已产出,流式不是改造而是顺其自然。
  3. 开启流式只需 stream=True,返回的是可迭代流,每产出一小段就能立刻拿到并处理。
  4. 普通 HTTP 一次性发完整响应体,装不下持续推送;用 SSE,基于普通 HTTP 即可分条持续推。
  5. SSE 消息以 data: 开头、空行结尾;单向推送场景用 SSE 即可,不必上更重的 WebSocket。
  6. 后端用 StreamingResponse 承载生成器,整条链路不攒,模型生成到哪前端几乎同步收到哪。
  7. 前端必须边读边渲染:网络数据块和 SSE 消息边界不对齐,要用 buffer 按空行切出完整消息。
  8. 流式接口一旦开始推流,状态码和响应头已发出无法再改,出错只能在流内部发 error 事件。
  9. 要检测客户端断开,用户关页面后立刻停止生成,否则模型会把整段生成完、白白烧掉 token。
  10. 边推流边自己攒一份完整文本用于落库记历史;并确认中间网关代理没有缓冲毁掉流式效果。

总结

回头看那个"用户问完干等十几秒"的对话产品,以及我后来在流式输出上接连踩的坑,最该记住的不是某一段 SSE 的解析代码,而是我动手前那个想当然的判断——"接口就该是'请求、等待、拿到完整结果'"。这个模式对绝大多数接口都对,可它偏偏不适合大模型。因为大模型的输出有一个别的接口少见的特性:它耗时很长,但又是一点一点产出的。对这种"过程很慢、但过程本身有价值"的输出,硬套"等结果"的模式,就等于把那个缓慢却可见的生成过程,强行压缩成一个用户什么都看不到的黑盒。流式输出做的事,就是把这个黑盒重新打开,让用户看见里面正在发生什么。

所以做流式输出,真正的工程量不在"加一个 stream=True"那一下。开启流式、迭代 chunk,这部分 Demo 里谁都能跑通。真正的工程量在那条"流"经过的每一个环节:模型的流,怎么接到 HTTP 上(SSE)?后端怎么保证整条链路不"攒"、真正一段段往外发?前端怎么处理"网络块"和"消息块"不对齐的问题?流到一半出错,你怎么告诉前端——状态码已经发不出去了?用户关了页面,你的后端知道吗,还是在白烧 token?这篇文章的几节,其实就是顺着这条思路展开的:先想清楚非流式为什么体验差,再看模型为什么天然支持流式,然后是 SSE 协议、后端接口、前端接收这三段主干,最后是错误处理、客户端取消、完整文本收集这几个把流式输出真正做稳的工程细节。

你会发现,流式输出的思路和我们处理任何"耗时长、但有中间产出"的任务的工程经验都是相通的。下载一个大文件,我们会显示进度条,而不是让用户对着空白等到下载完;一个长流程的表单提交,我们会一步步显示"正在校验、正在提交、正在生成",而不是憋一个大转圈。它们的内核是同一个:当一件事注定要花不少时间,就别把这段时间做成一个用户无法感知的黑盒,要把过程本身暴露出来、可视化。大模型的回答就是这样一件事——它的生成过程,本身就是值得展示给用户看的东西。

最后想说,流式做没做扎实,差距永远不会在 Demo 里暴露——Demo 里你问个简单问题,一两秒就出结果,流式不流式几乎看不出区别。它只在真实的用户、真实的复杂问题、真实的长回答面前才显形。那时候非流式会用最难堪的方式给你结账:一个十几秒的空白加载界面,把一批又一批没耐心的用户挡在门外;一个流到一半因为模型超时而永远停住、却连个错误提示都给不出的对话;一笔又一笔花在"用户早就关了页面、模型还在傻傻生成"上的 token 账单。所以别等用户因为"这产品怎么一直转圈"而划走,在你写下第一个调模型的接口时就该想清楚:这个回答可能要生成多久?这段时间我让用户看到进展了吗?流到一半出错,我有办法告诉他吗?他要是中途走了,我的后端停得下来吗?这几个问题都有了答案,你的 AI 产品才不只是 Demo 里那个简单问题跑得通的样子,而是一个无论回答多长,用户都能第一时间看到它"正在生长"的、让人愿意等下去的可靠产品。

—— 别看了 · 2026
声明:本站所有文章,如无特殊说明或标注,均为本站原创发布。任何个人或组织,在未征得本站同意时,禁止复制、盗用、采集、发布本站内容到任何网站、书籍等各类媒体平台。如若本站内容侵犯了原著者的合法权益,可联系我们进行处理 邮箱1846861578@qq.com。
技术教程

接口限流完全指南:从一次"活动一开服务就被瞬时流量冲垮"看懂限流算法

2026-5-21 19:46:01

技术教程

缓存穿透击穿雪崩完全指南:从一次"加了缓存数据库反而被打挂"看懂三大缓存问题

2026-5-21 19:56:34

0 条回复 A文章作者 M管理员
    暂无讨论,说说你的看法吧
个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索