大模型流式输出完全指南:从一次"用户盯着空白屏幕等了十几秒以为卡死"看懂 SSE 流式响应

2024 年我做一个 AI 对话产品。核心功能很简单用户输入一个问题后端调大模型把答案返回给前端显示。第一版我做得很直接前端发请求后端调模型的接口等模型把答案完整生成完把这一整段答案一次性返回给前端前端拿到后整段渲染出来。本地一测能用答案也对。可真正给同事试用问题立刻就来了同事点了发送然后对着一片空白的屏幕干等。等了三秒屏幕还是空的等到八秒他以为卡死了伸手就想刷新页面直到第十几秒答案才啪地一下整段蹦出来。他问我你这个是不是卡了可它没卡它只是在老老实实地生成。我盯着这个等十几秒然后整段蹦出的体验想了很久才想明白第一版错在一个根本的认知上我以为等模型生成完一次性返回就行了。这个想法在逻辑上没错但它忽略了一件事大模型生成一段长答案是要花时间的它是一个字一个字往外蹦的蹦完整段可能要十几秒。而我的第一版是攒着模型蹦出来的字我一个都不给用户看非要等它全部蹦完才一次性端上去。正确的做法是流式输出模型蹦一个字我就推一个字给前端这正是你在所有主流 AI 产品里看到的那个打字机效果。本文从头梳理为什么等生成完再返回体验这么差流式输出的本质是什么模型的流式 API 怎么用怎么用 SSE 把流推给前端前端怎么接收以及流的中断出错首字延迟这些把流式真正做对要避开的坑。

2024 年我做一个 AI 对话产品。核心功能很简单:用户输入一个问题,后端调大模型,把答案返回给前端显示。第一版我做得很直接:前端发请求,后端调模型的 chat.completions.create,等模型把答案完整生成完,把这一整段答案一次性返回给前端,前端拿到后整段渲染出来。本地一测——能用,答案也对。可真正给同事试用,问题立刻就来了:同事点了"发送",然后对着一片空白的屏幕,干等。等了 3 秒,屏幕还是空的;等到 8 秒,他以为卡死了,伸手就想刷新页面;直到第 十几秒,答案才""地一下整段蹦出来。他问我:"你这个是不是卡了?"——可它没卡,它只是在老老实实地生成。我盯着这个"等十几秒、然后整段蹦出"的体验想了很久才想明白,第一版错在一个根本的认知上:我以为"等模型生成完,一次性返回,就行了"。这个想法,在逻辑上没错,但它忽略了一件事:大模型生成一段长答案,是要花时间的——它是一个字一个字(更准确说,一个 token 一个 token)往外的,蹦完整段可能要十几秒。而我的第一版,是攒着:模型蹦出来的字,我一个都不给用户看,非要等它全部蹦完,才一次性端上去。在这十几秒里,用户那边什么反馈都没有,他无从判断系统到底是在干活还是已经死了。可你想想:模型在第 1 秒,其实就已经生成出开头的几个字了——这几个字,本可以立刻显示给用户看。正确的做法,是流式输出:模型蹦一个字,我就推一个字给前端,前端蹦一个字,就显示一个字。用户在第 1 秒就能看到答案开始一个字一个字地往外冒——这正是你在所有主流 AI 产品里看到的那个"打字机"效果。它不会让总时间变短,但它让用户从第一刻起就确信"系统在为我工作"。我以为流式不过是"加个参数",结果真做下来,坑一个接一个。这篇文章就把它梳理一遍:为什么"等生成完再返回"体验这么差、流式输出的本质是什么、模型的流式 API 怎么用、怎么用 SSE 把流推给前端、前端怎么接收,以及流的中断、出错、首字延迟这些把流式真正做对要避开的坑。

问题背景

先把那次的现象和我的误判讲清楚,后面所有的设计都是冲着纠正这个误判去的。

现象:一个 AI 对话产品,后端等大模型把答案完整生成完,再一次性返回前端。用户点"发送"后,盯着空白屏幕干等十几秒,中途完全没有任何反馈,常常以为卡死了想刷新页面,直到最后答案整段蹦出

我当时的错误认知:"等模型生成完,把完整答案一次性返回,这是最简单也最正确的做法。"

真相:大模型是一个 token 一个 token 逐步生成的,生成一整段长答案要十几秒。"等生成完再返回"等于把已经生成好的开头也一起扣住,白白让用户对着空屏幕焦虑。正确的做法是流式输出:模型每生成一小段,就立刻推送给前端显示。技术上,模型 API 提供 stream=True 逐块返回;后端用 SSE(Server-Sent Events) 把这些块持续推给前端;前端边收边渲染,形成"打字机"效果。

要把流式输出做对,需要几块认知:

  • 为什么"等生成完再返回"体验差——它扣住了已生成的开头;
  • 流式输出的本质——边生成边推送,让反馈尽早到达;
  • 模型的流式 API 怎么用,返回的 chunk 长什么样;
  • 怎么用 SSE 把流从后端推到前端,前端怎么接收;
  • 流的中断、中途出错、首字延迟这些工程坑怎么处理。

一、为什么"等生成完再返回"体验这么差

先把这件最根本的事钉死:大模型生成长文本是逐 token、耗时十几秒的过程;"等全部生成完再返回",等于把早已生成好的开头也一起扣留,让用户在最该获得反馈的前十几秒里一无所知。

下面这段代码,就是我那个"用户对着空屏幕干等"的第一版——它是阻塞式的:

from openai import OpenAI

client = OpenAI()


def chat_blocking(question: str) -> str:
    # 反面教材:阻塞式调用,等模型把答案【全部生成完】才返回。
    resp = client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[{"role": "user", "content": question}],
    )
    # 这一行会一直【卡住】,直到模型生成完整段答案 ——
    # 可能是十几秒。这十几秒里,调用方什么都拿不到。
    return resp.choices[0].message.content
    # 问题:模型在第 1 秒其实已生成出开头几个字,
    # 但这里非要等到第 15 秒整段生成完,才一次性交出去。
    # 用户那十几秒,对着空屏幕,无从判断系统是死是活。

这段代码没有任何语法错误,逻辑也完全正确——它确实能拿到完整答案。它的问题不在对错,而在一个错误的时间观:它把"生成答案"当成了一个瞬间完成的动作,于是"等它完成再返回"听起来天经地义。可"生成答案"不是一个瞬间动作,它是一个持续十几秒的过程。在这个过程里,答案是一点一点变长的:第 1 秒可能有 10 个字,第 5 秒有 100 个字,第 15 秒才完整。chat_blocking 的做法,相当于守着一个正在出菜的厨房,前面的菜一道道都好了,却偏要等最后一道也上齐,才把整桌菜一起端给客人——客人在空桌子前坐了很久,而其实第一道菜早就能吃了。问题的根子清楚了:你不该攒着,你该好一道、上一道

二、流式输出的本质:边生成边推送

上一节的死结是:阻塞式调用扣住了那些早已生成好的内容。流式输出(Streaming)的破局点就一句话:不要等,模型生成出多少,就立刻往前端推多少。

它的运作方式,和阻塞式形成鲜明对比。阻塞式是"一锤子买卖":请求-等待-一次性响应,响应只发生一次。而流式是"细水长流":一次请求,后端会持续不断地往回送许多个小数据块,直到答案完整。用户看到的,就是答案一个字一个字往外冒的"打字机"效果。

这里要想清楚一件事:流式并不会让答案生成得更快——模型生成完整段答案,该花十几秒还是十几秒,流式一秒都省不了。它改变的是另一个东西,一个对体验极其关键的指标:首字延迟(从用户发出请求,到他看到第一个字的时间)。阻塞式的首字延迟,等于整段生成的总时间(十几秒)——因为用户要等到最后才一次看到全部。流式的首字延迟,只等于模型生成出头几个字的时间(可能不到 1 秒)。用户的焦虑,几乎全部来自这个"看到第一个字之前"的空白期。流式做的,就是把这个空白期从十几秒压缩到几乎为零。它没有缩短总时长,但它极大地缩短了"感觉上的等待"。理解了这个,剩下的就是工程问题——第一步是:怎么从模型那里,拿到这个"一点一点的流"?

三、模型的流式 API:逐块拿到生成结果

好消息是,主流大模型的 API 原生就支持流式。你不需要自己做什么特殊处理,只要在调用时多加一个参数:stream=True。加上它之后,API 的返回值就变了——它不再是一个完整的响应对象,而变成一个可以迭代的流:你用 for 循环去迭代它,每一轮拿到一个小数据块(chunk),块里装着模型刚刚新生成的那一小段文字。

def stream_from_model(question: str):
    """开启 stream=True,逐块拿到模型生成的内容。"""
    stream = client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[{"role": "user", "content": question}],
        stream=True,                  # 关键:开启流式
    )
    for chunk in stream:              # 迭代这个流,一块一块地拿
        # 每个 chunk 里,delta.content 是这一块【新增】的文字。
        # 注意:它可能是 None(比如流的第一块或最后一块),要判一下。
        delta = chunk.choices[0].delta.content
        if delta:
            yield delta               # 把这一小段文字交出去

这里有个关键细节:每个 chunk 里的 delta.content,是这一块新增的文字,不是到目前为止的全部文字。也就是说,你把每一块的 delta 依次拼接起来,才得到完整答案。还有,delta.content 可能是 None(比如流的第一块往往只带角色信息、最后一块只带结束标记),所以迭代时必须判空,否则会报错。现在,后端已经能一块一块地拿到生成结果了。下一个问题是:怎么把这个"后端的流",同样流式地送到前端?

四、用 SSE 把流推送到前端

后端有了流,要把它持续地送到前端,需要一种支持"服务端持续往客户端推数据"的传输方式。最合适的就是 SSE(Server-Sent Events,服务器发送事件)。SSE 是 HTTP 的一种用法:客户端发起一个普通请求,但这个连接不立刻关闭,服务端可以源源不断地往这条连接里写数据,客户端边收边处理。它天生就是为"服务端推流"设计的,而且比 WebSocket 简单得多

SSE 的数据有一个固定格式:每一条消息,以 data: 开头,以两个换行符 \n\n 结尾。下面用 Flask 写一个 SSE 接口:

import json
from flask import Flask, request, Response

app = Flask(__name__)


@app.route("/chat/stream")
def chat_stream():
    question = request.args.get("q", "")

    def event_generator():
        # 把模型流里的每一小段,按 SSE 格式("data: ...\n\n")往外发
        for delta in stream_from_model(question):
            payload = json.dumps({"content": delta}, ensure_ascii=False)
            yield f"data: {payload}\n\n"        # SSE 单条消息的格式
        # 全部发完,再发一条特殊消息,明确告诉前端"结束了"
        yield 'data: {"done": true}\n\n'

    # mimetype 必须是 text/event-stream,前端才会按 SSE 来解析
    return Response(event_generator(), mimetype="text/event-stream")

这段代码里有两个不能漏的点。第一,Responsemimetype 必须text/event-stream——这是 SSE 的标志,前端和浏览器靠它识别这是一个流。第二,在所有内容发完后,要额外发一条done 标记的消息。因为流式响应没有传统响应那种明确的"响应体到此结束",前端无从知道"是真的发完了,还是网络卡了一下"。所以你得自己约定一个"结束信号",显式地告诉前端"到此为止"。后端的流推出去了,接力棒交到前端手里。

五、前端怎么接收并渲染这个流

前端接收 SSE 流,有两种常见方式。第一种是浏览器原生EventSource 对象,它专门用来消费 SSE,用起来最简单:

function chatWithEventSource(question) {
  // EventSource 是浏览器原生的 SSE 客户端
  const es = new EventSource(`/chat/stream?q=${encodeURIComponent(question)}`);
  let answer = "";

  es.onmessage = (event) => {
    const data = JSON.parse(event.data);
    if (data.done) {           // 收到约定的结束信号
      es.close();              // 关键:必须手动关闭,否则会一直重连
      return;
    }
    answer += data.content;    // 把这一小段拼接到已有答案上
    document.getElementById("answer").textContent = answer;  // 实时渲染
  };

  es.onerror = () => {
    es.close();                // 出错也要关闭,避免无限重连
  };
}

EventSource 简单,但有局限:它只支持 GET 请求,也不能自定义请求头(比如带认证 token 就不方便)。如果需要 POST、需要带请求头,就得用第二种方式——fetch 配合 ReadableStream 手动读取流:

async function chatWithFetch(question) {
  const resp = await fetch("/chat/stream", {
    method: "POST",
    headers: { "Content-Type": "application/json" },
    body: JSON.stringify({ q: question }),
  });
  const reader = resp.body.getReader();       // 拿到流的读取器
  const decoder = new TextDecoder();
  let answer = "";

  while (true) {
    const { value, done } = await reader.read();   // 一块一块地读
    if (done) break;                               // 流读完了
    const text = decoder.decode(value);            // 字节解码成文字
    // SSE 每条消息以 \n\n 分隔,逐条解析出来
    for (const line of text.split("\n\n")) {
      if (line.startsWith("data: ")) {
        const data = JSON.parse(line.slice(6));
        if (!data.done) {
          answer += data.content;
          document.getElementById("answer").textContent = answer;
        }
      }
    }
  }
}

两种方式,各有取舍:EventSource 省心,但只能 GET、不能带头;fetch + ReadableStream 灵活,支持 POST 和自定义头,但要自己解析 SSE 格式。简单场景用前者,需要鉴权、传复杂参数就用后者。到这里,一条从模型用户屏幕的完整流式链路就通了。但要把它真正用在生产上,还有几个绕不开的工程坑。

六、工程坑:中断、错误处理与首字延迟

流式链路通了,但有几个工程坑,不处理就会在生产上出事。坑 1:用户中途关页面,后端要能感知并停止。流式响应往往持续十几秒,这期间用户完全可能关掉页面点了"停止"。如果后端察觉不到,它会继续调模型、继续生成——而这些生成没人要了,纯属烧钱。后端要能感知连接已断开,一旦断开就停止迭代模型流:

@app.route("/chat/stream")
def chat_stream_safe():
    question = request.args.get("q", "")

    def event_generator():
        try:
            for delta in stream_from_model(question):
                payload = json.dumps({"content": delta},
                                     ensure_ascii=False)
                yield f"data: {payload}\n\n"
            yield 'data: {"done": true}\n\n'
        except GeneratorExit:
            # 客户端断开连接时,这个生成器会被关闭,抛出 GeneratorExit。
            # 捕获它 = 知道"用户不要了",就此打住,别再调模型烧钱。
            print("客户端已断开,停止生成")
            raise
        except Exception as e:
            # 坑 2:流【已经开始】之后才出错,要把错误也当成一条消息发出去
            err = json.dumps({"error": str(e)}, ensure_ascii=False)
            yield f"data: {err}\n\n"

    return Response(event_generator(), mimetype="text/event-stream")

坑 2:流中途出错,不能简单抛 500。这是流式特有的难点。普通接口出错,你可以直接返回 HTTP 500。但流式响应,在出错时 HTTP 状态码早已发出去了(在第一个 chunk 发出时,状态码就是 200 了),你没法再改成 500。所以流开始之后的错误,只能当成流里的一条特殊消息发给前端(上面的 {"error": ...}),由前端识别这条消息并做相应提示。前端的解析逻辑也要相应地处理这个 error 字段:

function handleSSEData(raw, onText, onError, onDone) {
  // 前端统一处理一条 SSE 消息:正常内容 / 错误 / 结束,分别对待
  const data = JSON.parse(raw);
  if (data.error) {            // 流中途出错:模型超时、内容被拦截等
    onError(data.error);       // 给用户一个明确的失败提示
  } else if (data.done) {      // 收到结束信号
    onDone();
  } else {
    onText(data.content);      // 正常的一小段内容,追加渲染
  }
}

坑 3:首字延迟,是流式体验的生命线。流式的全部价值,就在于首字延迟短。所以任何挡在"第一个字"前面的耗时操作都要警惕:比如你在调模型之前,先做了一次耗时的数据库查询RAG 检索——这些时间会全部加到首字延迟上,流式的优势就被抵消了。能提前做的别拖到请求里,能并行的别串行。坑 4:别忘了把缓冲关掉。有些 Web 服务器或反向代理(如 Nginx)默认会缓冲响应——它会攒一批数据再一起发,这会让你精心设计的流式又变回"一坨一坨"。要在响应头里加 X-Accel-Buffering: no 之类的设置,显式关掉缓冲坑 5:完整答案要在服务端自己拼。如果你需要把这次对话存进数据库,记住模型是分块发的,前端拿到的是一段段碎片,后端不能依赖前端回传完整答案。正确做法是后端一边推流、一边把每块 delta 拼接成完整答案,流正常结束后再落库:

@app.route("/chat/stream")
def chat_stream_persist():
    question = request.args.get("q", "")

    def event_generator():
        parts = []                       # 一边推流,一边把每块攒进来
        try:
            for delta in stream_from_model(question):
                parts.append(delta)      # 关键:服务端自己留一份
                payload = json.dumps({"content": delta},
                                     ensure_ascii=False)
                yield f"data: {payload}\n\n"
            # 流【正常走完】才拼接落库 —— 中途断开就不存半截答案
            full_answer = "".join(parts)
            save_conversation(question, full_answer)
            yield 'data: {"done": true}\n\n'
        except GeneratorExit:
            # 用户中途断开:已生成的半截答案要不要存,按业务定
            raise

    return Response(event_generator(), mimetype="text/event-stream")

下面这张图,把一次流式问答的完整链路串起来:

关键概念速查

概念 / 手段 说明
阻塞式返回的问题 等模型整段生成完才返回,扣住了早已生成的开头,用户对着空屏幕干等
流式输出 模型生成多少就立刻推多少,用户看到答案逐字往外冒的打字机效果
首字延迟 从请求到看到第一个字的时间,流式把它从十几秒压缩到几乎为零
stream=True 模型 API 开启流式,返回值变成可迭代的流,逐块拿到生成内容
delta 是增量 每个 chunk 的 delta.content 是新增文字不是全部,要依次拼接,且可能为 None
SSE 服务端推送 一个连接持续往客户端写数据,格式是 data 开头两个换行结尾
结束信号 流式没有天然的响应结束标志,要自己约定一条 done 消息显式告知
EventSource 浏览器原生 SSE 客户端,简单但只支持 GET、不能自定义请求头
fetch 读流 fetch 配合 ReadableStream 手动读流,支持 POST 和自定义头但要自己解析
流中途出错 状态码已发出无法改 500,只能把错误当成流里一条特殊消息发给前端

避坑清单

  1. 别用阻塞式等模型整段生成完再返回,这会扣住早已生成的开头,让用户对着空屏幕干等十几秒。
  2. 流式输出不会缩短总生成时间,但能把首字延迟从十几秒压到几乎为零,大幅改善体验感受。
  3. 模型 API 加 stream=True 即开启流式,返回值变成可迭代的流,用 for 循环逐块取。
  4. 每个 chunk 的 delta.content 是新增增量不是全部,要依次拼接,且它可能为 None 必须判空。
  5. 用 SSE 推流,响应 mimetype 必须是 text/event-stream,消息格式是 data 开头两个换行结尾。
  6. 流式没有天然的结束标志,后端必须额外发一条 done 消息,前端才知道是真发完了。
  7. 前端简单场景用 EventSource,需要 POST 或自定义请求头则用 fetch 配合 ReadableStream。
  8. 用户中途关页面,后端要捕获 GeneratorExit 感知断开并停止生成,否则会继续调模型烧钱。
  9. 流开始后才出错无法再返回 500,只能把错误当成流里一条特殊消息发出,由前端识别提示。
  10. 首字延迟是流式的生命线,别在调模型前堆耗时操作,并关掉 Nginx 等代理的响应缓冲。

总结

回头看那次"用户对着空白屏幕等了十几秒、以为卡死"的事故,以及我后来在流式上接连踩的坑,最该记住的不是某一段 SSE 代码,而是我动手前那个想当然的判断——"等模型生成完,一次性返回就行"。这句话错在它把"生成一段答案"看成了一个不可分割的、瞬间的整体。可它根本不是。一段长答案的生成,是一个有过程、有先后、持续十几秒的事件——开头的字早早就生成好了,结尾的字很晚才出来。我的第一版,做的是一件很别扭的事:它明明在第 1 秒就握着答案的开头,却硬要把它攥在手里,陪着用户一起干等到第 15 秒,才一次性松手。流式输出想清楚的,正是这件事:既然内容是陆续生成出来的,那就陆续地交付出去——生成的节奏,就应该是交付的节奏。不要在中间设一个水库,把流动的水蓄起来,非等蓄满了才开闸放一次

所以做流式,真正的工程量不在"stream=True 加上去"那一下。那一下,API 帮你做完了。真正的工程量,在于你意识到:流式改变的不只是一个参数,而是整条链路的"形态"。阻塞式是一问一答,流式是一问、多答、再有个结束。这个形态的改变,会顺着链路一路传导,逼着每个环节都跟着变:传输层,你不能再用普通响应,得用 SSE 这种能持续推的;协议上,你得自己造一个"结束信号",因为流没有天然的终点;错误处理上,你得接受一个反直觉的事实——流一旦开始,你就再没机会返回 500 了,出错只能混在流里说;连用户中途反悔这种事,你都得认真处理,否则就是对着空气烧钱。这篇文章的几节,其实就是顺着这条"形态改变"展开的:先想清楚阻塞式差在哪、流式的本质是什么,再看模型流式 API、SSE 推送、前端接收这三段主干,最后是中断、出错、首字延迟这几个把流式真正做对的工程细节。

你会发现,流式输出的思路,和现实里一个好的服务者怎么对待"让人等待"这件事,完全相通。你去一家餐厅,菜要做二十分钟——一个糟糕的餐厅,会让你对着空桌子干坐二十分钟,期间没有任何人理你,你不知道厨房到底在不在做你的菜,你越坐越慌。一个的餐厅会怎么做?它会先给你上茶水、上小菜,会告诉你"您的菜在做了,大概还要十五分钟",会每道菜一好就先端上来。它并没有把那二十分钟变短——菜该做多久还是多久——但它让你每时每刻都知道"我没有被遗忘,事情在推进"。流式输出做的,就是这件事:它治不了"生成需要时间"这个客观事实,但它能治"等待时的那种被遗弃的焦虑"。技术产品的体验,很多时候就赢在这种地方——不是真的更快,而是让用户感觉到了被尊重、被告知

最后想说,流式做没做扎实,差距永远不会在 Demo 里暴露——Demo 里你自己问一个简短的问题,模型两秒就答完了,有没有流式看起来差不多。它只在真实的、用户会问复杂问题、模型要生成长答案、网络还时好时坏的生产环境里才显形。那时候它会用最直接的方式给你结账:做不好,你会像我一样,看着用户对着空屏幕一脸茫然,看着他们不耐烦地反复刷新,看着跳出率居高不下——你的产品明明能力不差,却因为"看起来像卡死了"而被用户放弃。而做了,用户点下"发送"的那一瞬间,答案就开始一个字一个字地、温和地往外流淌,他从第一秒起清清楚楚地看到"系统在为我认真工作"——哪怕完整答案同样要等十几秒,这十几秒的感受,已经天差地别。所以别等用户"以为卡死了"的反馈找上门,在你写下第一行"调用大模型"的代码时就该想清楚:模型生成这段答案,要花多久?这段时间里,用户那边看得到反馈吗?他能分清"系统在干活"和"系统死了"吗?这几个问题都有了答案,你的 AI 产品才不只是 Demo 里那个答得出问题的样子,而是一个无论问题多复杂、答案多长,都能让用户从第一秒起就安心的可靠产品。

—— 别看了 · 2026
声明:本站所有文章,如无特殊说明或标注,均为本站原创发布。任何个人或组织,在未征得本站同意时,禁止复制、盗用、采集、发布本站内容到任何网站、书籍等各类媒体平台。如若本站内容侵犯了原著者的合法权益,可联系我们进行处理 邮箱1846861578@qq.com。
技术教程

缓存穿透、击穿、雪崩完全指南:从一次"加了 Redis,数据库还是被打挂"看懂缓存防护

2026-5-21 21:48:12

技术教程

限流算法完全指南:从一次"固定窗口在临界点放进双倍流量、服务被打挂"看懂四种限流

2026-5-21 21:59:50

0 条回复 A文章作者 M管理员
    暂无讨论,说说你的看法吧
个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索