LLM 流式输出完全指南:从一次"用户点了发送对着空白屏幕等十几秒"看懂为什么 AI 对话必须用流式

2024 年我做一个网页端的 AI 对话功能用户输入一个问题后端调用大模型把模型的回答显示出来这件事我没多想就有了方案后端调用模型拿到完整的回答字符串返回给前端前端显示出来第一版我做得很顺手后端一个接口里面调模型的 API 等它返回那段完整的文本把文本塞进 JSON 响应里返回前端拿到响应把文本渲染到对话框本地拿几个短问题一测问今天星期几这种一秒不到就出来了我心里很笃定调模型嘛不就是发个请求等个响应拿到字符串显示出来可等真实用户开始问那些需要长篇回答的问题一串问题冒了出来第一种最先把我打懵用户问一个复杂问题点了发送之后对话框里十几秒什么都没有就一个转圈用户以为卡死了刷新重发甚至直接走了第二种最难缠回答越长等得越久第三种最头疼偶尔模型生成到一半网络断了我的接口直接超时报错前面已经生成的大半段回答也全丢了第四种最莫名其妙用户问到一半发现问错了想停下来可我的接口根本没有中途停止这个概念我盯着这一连串问题想了很久才彻底想明白第一版错在一个根本的认知上我以为调用大模型就和调用一个普通的后端接口一样可大模型是逐 token 生成的你用等完整响应的方式调它就是硬生生把这个逐字涌出的过程憋成了一次漫长的等待本文从头梳理为什么等生成完再返回的体验是错的怎么用 SSE 把 token 边生成边推给前端流式下的错误处理为什么不一样用户中途取消该怎么做以及一些把它做扎实要避开的工程坑

2024 年我做一个网页端的 AI 对话功能——用户输入一个问题,后端调用大模型,把模型的回答显示出来。这件事我没多想,就有了方案:后端调用模型,拿到完整的回答字符串,返回给前端,前端显示出来。第一版我做得很顺手——后端一个接口,里面调模型的 API,等它返回那段完整的文本,把文本塞进 JSON 响应里返回;前端拿到响应,把文本渲染到对话框。本地拿几个短问题一测,问"今天星期几"这种,一秒不到就出来了,我心里很笃定:调模型嘛,不就是发个请求、等个响应、拿到字符串、显示出来,这对话功能稳了。可等真实用户开始问那些需要长篇回答的问题,一串问题冒了出来。第一种最先把我打懵:用户问一个复杂问题,点了发送之后,对话框里十几秒什么都没有,就一个转圈,用户以为卡死了,刷新、重发,甚至直接走了。第二种最难缠:回答越长等得越久,有的问题模型要生成七八秒,用户对着空白屏幕的七八秒,长得像一个世纪。第三种最头疼:偶尔模型生成到一半网络断了,我的接口直接超时报错,用户那条问题等于白问,前面已经生成的大半段回答也全丢了。第四种最莫名其妙:用户问到一半发现问错了,想停下来,可我的接口是"要么拿到完整结果、要么报错",根本没有"中途停止"这个概念,用户只能干等它生成完。我盯着这一连串问题想了很久,才彻底想明白:第一版错在一个根本的认知上。我以为调用大模型,就和调用一个普通的后端接口一样——发一个请求,等一个完整的响应,拿到结果再处理。可大模型根本不是"想好整段答案,然后一次性吐出来"的——它是逐 token(可粗略理解为逐字、逐词)生成的,第一个字和最后一个字之间,隔着实实在在的好几秒。你用"等完整响应"的方式调它,就是硬生生把这个"逐字涌出"的过程,憋成了一次漫长的、用户面对空白屏幕的等待。要把 AI 对话的体验做扎实,根上要明白:大模型的输出是一个"流",你要做的不是"等流结束再处理",而是"让这个流一边生成、一边流到用户眼前"。本文从头梳理:为什么"等生成完再返回"的体验是错的,怎么用 SSE 把 token 边生成边推给前端,流式下的错误处理为什么不一样,用户中途取消该怎么做,以及一些把它做扎实要避开的工程坑。

问题背景

先把大模型的输出方式说清楚。一个大模型,本质是逐 token 生成文本的程序:它生成完第一个 token,接着生成第二个,再第三个……一个长回答,可能由几百上千个 token 组成,整个生成过程要持续好几秒甚至更久。模型的 API 通常提供两种调用方式:一种是非流式,服务端把所有 token 都生成完,打包成一个完整响应一次性返回;另一种是流式,服务端每生成一个或几个 token,就立刻通过一个持续的连接把这一小段推送出来。

错误认知是:调用大模型和调用普通接口没区别,发请求、等响应、拿结果。真相是:普通接口的处理时间通常是几十毫秒,"等"几乎无感;而大模型生成一个长回答要好几秒,这几秒的"等",用户全程面对一个空白屏幕,体验是灾难性的。把这一点摊开,第一版的几类问题就都能解释了:

  • 长时间空白等待:非流式调用必须等全部 token 生成完才返回,用户在这期间只能看着转圈,无从知道系统是在工作还是卡死了。
  • 回答越长越难熬:等待时间和回答长度成正比,长回答把"无反馈的等待"拉得更长。
  • 中途失败全盘皆输:非流式下,生成到一半的失败会让整个响应报错,已经生成的大半段内容全部丢失。
  • 无法中途停止:非流式调用是一个"原子"的请求,没有"取消正在进行的生成"这个能力。

所以让 AI 对话体验做对,核心不是把后端接口写得更快,而是改变调用模型的范式——把"等一个完整响应"改成"消费一个持续的流"。下面六节,就从第一版"等生成完再返回"的错讲起。

一、为什么"等生成完再返回"的体验是错的

第一版的接口,逻辑上完全没毛病:调模型、拿结果、返回。它的问题不在逻辑,在"时间感"。一个普通的后端接口,比如查个数据库,从收到请求到返回响应,可能就几十毫秒,用户根本感知不到中间的等待。第一版是把"调用大模型"也当成了这样一个接口。可大模型生成一个长回答,从第一个 token 到最后一个 token,要持续好几秒。在非流式调用下,这好几秒里,后端在默默地等模型,前端在默默地等后端,用户在默默地等前端——整条链路上,没有任何东西流动到用户眼前。

# 反面教材:等模型把整段答案生成完,才一次性返回

def chat_blocking(client, question: str) -> dict:
    # stream 默认为 False:这一行会一直阻塞,
    # 直到模型把几百个 token 全部生成完才返回
    resp = client.chat.completions.create(
        model="some-model",
        messages=[{"role": "user", "content": question}],
    )
    full_text = resp.choices[0].message.content
    # 这中间的 5 到 10 秒,用户的屏幕上什么都没有
    return {"answer": full_text}

# 前端拿到这个接口的响应时,回答已经"凭空出现"了一整段
# 但用户为这一整段,对着空白屏幕干等了好几秒

这里的关键,是要看清"模型其实早就开始产出了"。模型生成到第 1 个 token 时,这个字就已经可以给用户看了;生成到第 50 个 token 时,前面 50 个字早就是确定的、可以展示的内容了。非流式调用,等于是把这些"已经生成好、本可以立刻展示"的内容,生生攒着、压着,非要等到最后一个 token 落地,才一股脑放出来。用户本来可以在第一秒就看到回答开头,却被迫等到第八秒才看到全部——而那"全部",他其实是从开头一个字一个字读的。

这一节要建立的认知是:大模型的输出在"时间"这个维度上是展开的——它不是一个"瞬间产生的结果",而是一个"持续好几秒的过程";而非流式调用,把这个有时间长度的过程,强行压缩成了一个"点",代价就是用户面对空白屏幕的全部等待。普通接口我们不关心"过程",因为它的过程短到可以忽略,只有"结果"有意义。但大模型不一样,它的过程有实实在在的时间长度,长到"过程"本身就值得被用户看见。"等生成完再返回",错就错在它只承认"结果"、抹掉了"过程"。把这层认知摆正,解法就清楚了:不要再把模型调用当成一个求"结果"的函数,要把它当成一个"会持续产出的源头",你的任务是把这个源头产出的东西,实时地导流到用户眼前。这就是流式。

二、SSE:把 token 一边生成一边推给前端

要把模型"边生成边产出"的特性传递到用户眼前,需要解决两个环节。第一个环节,是后端怎么从模型那里"边生成边拿"。模型 API 基本都支持流式模式,打开它,API 调用返回的不再是一个完整字符串,而是一个可以迭代的"流"——你每次迭代,就拿到模型刚生成出来的一小段。

# 后端:用流式模式调模型,边生成边往下游推

def chat_stream(client, question: str):
    # stream=True:返回一个可迭代对象,而不是完整结果
    stream = client.chat.completions.create(
        model="some-model",
        messages=[{"role": "user", "content": question}],
        stream=True,
    )
    for chunk in stream:
        # 每个 chunk 是模型刚生成出来的一小段(可能是几个字)
        delta = chunk.choices[0].delta.content
        if delta:
            yield delta      # 立刻把这一小段交出去,不攒着

第二个环节,是后端怎么把这些一小段一小段的内容,实时地"推"给前端。普通的 HTTP 响应是"一次性"的,不适合这种持续推送。这里要用的技术叫 SSE(Server-Sent Events,服务器发送事件):它基于 HTTP,让服务端可以通过一个一直不关闭的连接,持续地往前端发送一段一段的数据。后端要做的,是把上面那个流里的每一小段,按 SSE 的格式包装好发出去。

# 后端:把模型的流,用 SSE 格式持续推送给前端
# 这里以 FastAPI 为例

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import json

app = FastAPI()

@app.get("/chat")
def chat(question: str):
    def event_generator():
        for delta in chat_stream(client, question):
            # SSE 规定:每条消息以 data: 开头,以两个换行结尾
            yield f"data: {json.dumps({'delta': delta})}\n\n"
        # 用一条特殊消息显式告诉前端:流结束了
        yield "data: [DONE]\n\n"

    return StreamingResponse(
        event_generator(),
        media_type="text/event-stream",   # SSE 的固定 MIME 类型
    )

前端这边,用浏览器原生的 EventSource,或者用 fetch 读取响应流,就能一段一段地收到后端推来的内容,收到一段就往对话框里追加一段。这样,用户在点下发送的一秒内,就能看到回答的开头冒出来,然后文字像打字机一样持续涌现。

// 前端:用 fetch 读取流,收到一段就追加一段

async function streamChat(question, onDelta) {
  const resp = await fetch(`/chat?question=${encodeURIComponent(question)}`);
  const reader = resp.body.getReader();
  const decoder = new TextDecoder();

  while (true) {
    const { value, done } = await reader.read();
    if (done) break;                          // 流读完了
    const text = decoder.decode(value, { stream: true });
    // 按 SSE 格式,逐行解析出 data: 后面的内容
    for (const line of text.split("\n\n")) {
      if (!line.startsWith("data: ")) continue;
      const payload = line.slice(6);
      if (payload === "[DONE]") return;
      onDelta(JSON.parse(payload).delta);     // 把这一小段渲染上去
    }
  }
}

这一节的认知是:流式不是某一处的改动,而是要打通"模型 → 后端 → 前端 → 用户眼睛"这一整条链路,让数据在每一个环节都"不积压、即产即走"。很多人以为流式就是后端加个 stream=True 那么简单,其实那只是第一环。如果后端从模型拿到了流,却还是攒齐了才返回,那前端依然是空等;如果后端用 SSE 推了,前端却还是用普通方式等整个响应,那也白搭。流式的本质,是一种"传送带"思维:模型把一小段放上传送带,后端不拦截、直接传给下一段,前端收到就立刻展示。这条传送带上任何一个环节"想攒一攒再走",整条流式就退化回了非流式。理解了这是一条"端到端的链路",你排查流式不流畅时,就会一个环节一个环节地查,而不是只盯着某一处。

三、流式下的错误处理:错误可能发生在中途

非流式调用的错误处理很简单:要么整个成功、拿到结果,要么整个失败、抛个异常,二选一。但流式不一样。流式下,你已经成功地把回答的前半段推给用户了,然后,模型那边可能在生成后半段时出错了——网络抖动、模型服务超时、触发了内容安全限制。这时候,前半段已经在用户屏幕上了,你不能装作无事发生,也不能让它就那么停在半句话上。第一版"生成到一半断了整个报错"的问题,根子就在没把"中途失败"当回事。

# 流式下:错误可能发生在已经推了一部分内容之后

import json

def safe_event_generator(client, question: str):
    sent_any = False
    try:
        for delta in chat_stream(client, question):
            sent_any = True
            yield f"data: {json.dumps({'delta': delta})}\n\n"
        yield "data: [DONE]\n\n"
    except Exception as e:
        # 关键:此时前半段可能已经发出去了,
        # 不能再抛 HTTP 500(响应头早就发完了,改不了状态码),
        # 只能在流里发一条"错误事件",让前端知道出了事
        err = {"error": True, "message": "生成中断,请重试"}
        yield f"data: {json.dumps(err)}\n\n"
        # sent_any 告诉前端:是从头就失败,还是中途断的

这里有个绕不开的技术细节:HTTP 响应的状态码和响应头,是在连接刚建立、还没发任何数据时就确定的。一旦你开始往流里写第一段数据,状态码就已经是 200 了,改不回去。所以流式下,中途发生的错误,没法用"返回一个 500 状态码"来表达——你只能在流的内部,发一条约定好格式的"错误消息",前端解析到这条消息,就知道生成中断了,然后据此处理:可能是在已生成的内容后面加一行"(回答因故中断)",并给一个重试按钮。

这一节的认知是:流式把"成功还是失败"这个二元判断,变成了一条有时间线的过程——失败可以发生在"已经成功了一半"之后,而你的错误处理必须能表达这种"半成功"的状态。非流式的世界很干净:结果要么有、要么没有。流式的世界有了"中间态":回答已经存在了一部分,但没能完成。你的错误处理设计,必须正视这个中间态——前端要能区分"一个字都没出来就失败了"(可以直接整体重试)和"出来一大半才断的"(要保留已有内容、提示中断、给重试)。这两种情况,对用户的意义完全不同。把流式的错误当成"过程中的事件"而不是"最终的结果",你才能给用户一个体面的、不丢内容的失败体验。

把流式对话从发起到结束的完整路径画出来,就是下面这张图:

[mermaid]
flowchart TD
A[用户发送问题] --> B[后端以流式模式调用模型]
B --> C{模型产出了新的一段吗}
C -->|是| D[按 SSE 格式推给前端]
D --> E[前端追加渲染这一段]
E --> C
C -->|生成完毕| F[发送结束标记 DONE]
C -->|中途出错| G[在流里发送错误事件]
C -->|用户取消| H[中断模型调用 停止计费]

四、取消与中断:用户中途关掉,要及时停止生成

第一版还有一个被完全忽略的能力:取消。用户问了个问题,模型开始长篇大论地生成,可用户看了开头两句,发现这不是他想问的,或者答案已经够了,他想停下来。非流式调用里,根本没有"停"这个概念——请求发出去了,就只能等它结束。但流式天然就具备"可中断"的能力,因为它本来就是一个持续的连接,你随时可以把这个连接断掉。

前端要做的,是用一个可以中止的请求。现代浏览器的 fetch 配合 AbortController,可以在用户点"停止"按钮时,主动中止这个流式请求。

// 前端:用 AbortController 让用户能中途停止生成

let currentController = null;

async function streamChat(question, onDelta) {
  currentController = new AbortController();
  const resp = await fetch(`/chat?question=${encodeURIComponent(question)}`, {
    signal: currentController.signal,        // 把中止信号挂上
  });
  const reader = resp.body.getReader();
  // ……读流、渲染,逻辑同前……
}

// 用户点"停止"按钮时调用
function stopGenerating() {
  if (currentController) {
    currentController.abort();              // 中止请求,连接断开
    currentController = null;
  }
}

但前端中止只是一半。更关键的一半在后端:前端断开连接后,后端必须感知到这个断开,并真的停止对模型的调用。否则,前端虽然不看了,后端还在傻乎乎地让模型继续生成——而模型生成是按 token 收费的,这些用户根本不会看到的 token,会实实在在地计入账单。后端要在每次往流里写数据时,检查客户端连接是否还在。

# 后端:感知到前端断开后,停止生成,别再烧 token

from fastapi import Request

@app.get("/chat")
async def chat(question: str, request: Request):
    async def event_generator():
        stream = client.chat.completions.create(
            model="some-model",
            messages=[{"role": "user", "content": question}],
            stream=True,
        )
        for chunk in stream:
            # 每推一段前,先看前端连接还在不在
            if await request.is_disconnected():
                stream.close()      # 关掉与模型的流,停止继续生成
                break               # 用户不看了,后端也别再烧 token 了
            delta = chunk.choices[0].delta.content
            if delta:
                yield f"data: {json.dumps({'delta': delta})}\n\n"

    return StreamingResponse(event_generator(),
                             media_type="text/event-stream")

这一节的认知是:流式带来的不只是"更快看到内容"这一个好处,它还带来了一个非流式根本不具备的能力——"过程中的可控性",而取消,就是这种可控性最直接的体现。非流式调用是一颗"发射后不管"的子弹,打出去就收不回。流式则是一条全程都连着的线,这条线在,就意味着你随时可以对这个"正在进行的过程"施加控制——停止它、或者将来还能做到暂停它、调整它。但要注意,这个控制是"双向"的:前端断开,和后端停止,是两件必须都做到的事。只做前端中止,用户是清净了,但你的成本还在哗哗地流。把取消这件事做完整——前端能中止、后端能感知、模型调用能真的停——你才算真正拿到了流式给你的这份"可控性"。

五、流式与结构化输出:别等整段拼完才解析

前面讲的都是"把文本流给用户看"。但还有一类场景:模型的输出不是给用户直接看的自然语言,而是要你的程序去解析的结构化数据,比如一段 JSON。流式和结构化输出放在一起,会产生一个新的矛盾:JSON 必须是完整的才能被解析,而流式给你的,永远是一段不完整的、还在增长的文本。

处理这个矛盾,要分清两种情况。第一种,你最终需要的是解析后的完整 JSON 对象——那就老老实实把所有流式片段拼接起来,等流结束、拿到完整文本,再一次性解析。这种情况下,流式的价值不是"边解析边用",而仅仅是"让你能尽早知道生成在进行、能展示进度"。

# 结构化输出 + 流式:片段先拼起来,完整了再解析

import json

def stream_then_parse(client, question: str):
    buffer = ""
    for delta in chat_stream(client, question):
        buffer += delta              # 流式片段只负责拼接
        # 可以在这里更新一个"正在生成"的进度提示

    # 流结束,buffer 是完整文本了,这时才解析
    try:
        return json.loads(buffer)
    except json.JSONDecodeError:
        # 模型偶尔会输出不合法的 JSON,要兜底
        return {"error": "模型返回的不是合法 JSON", "raw": buffer}

第二种情况更进阶:你希望一边接收流、一边把已经完整的那部分用起来。比如模型在流式输出一个 JSON 数组,数组里每多一个完整的元素,你就想立刻处理一个。这需要一个能处理"不完整 JSON"的增量解析器,复杂度高不少。大多数业务,第一种"拼完再解析"就够了——不要为了"流式"两个字,硬给自己上增量解析的复杂度。

这一节的认知是:流式和结构化输出,代表两种不同的诉求——流式要的是"尽早、逐步地拿到内容",结构化要的是"拿到一个完整、合法的整体",这两者之间有天然的张力。面对这种张力,关键是先想清楚:这个场景里,用户或程序到底要的是什么?如果输出是给用户读的文本,流式的"逐步呈现"就是核心价值,直接流。如果输出是给程序解析的 JSON,那"完整合法"是硬要求,流式就只能退居二线,做个进度展示——该等拼完,就老实等拼完。最忌讳的,是不分场景,觉得"流式更高级"就处处上流式,结果给一个本该完整解析的 JSON 场景,套上一个复杂又脆弱的增量解析。技术选型要服从场景的真实诉求,而不是反过来。

六、把流式输出做扎实,要避开的工程坑

前面五节搭出了一套能流式、能处理错误、能取消的方案。但要在生产里真正用好,还有几个坑得专门讲。第一个最隐蔽:你后端代码写对了流式,中间的反向代理(比如 Nginx)却可能默默地把你的流"缓冲"起来——它攒够一大块才转发给前端。结果就是,后端明明一段一段地推,前端却还是一卡一卡地、一大坨一大坨地收到,流式体验荡然无存。

# 坑一:Nginx 默认会缓冲响应,把流式攒成一坨
# 在转发 SSE 接口的 location 里,显式关掉缓冲

location /chat {
    proxy_pass http://backend;

    proxy_buffering off;          # 关掉代理缓冲,收到就转发
    proxy_cache off;              # 关掉缓存
    proxy_set_header Connection '';
    proxy_http_version 1.1;
    chunked_transfer_encoding on;
}
# 后端也可以发一个 X-Accel-Buffering: no 响应头,
# 显式告诉 Nginx 这个响应不要缓冲

第二个坑,是流式下的用量统计。非流式调用,响应里通常直接带着"这次用了多少 token";但流式调用,token 用量信息往往在流的最后一个特殊片段里,或者需要你自己在后端把所有 delta 累加起来估算。如果你要按 token 计费、或要做用量监控,得专门在流式结束时把这个数据收集好。

# 坑二:流式下,token 用量要在流结束时专门收集

def chat_stream_with_usage(client, question: str):
    stream = client.chat.completions.create(
        model="some-model",
        messages=[{"role": "user", "content": question}],
        stream=True,
        # 部分 API 需要显式要求在流末尾带上用量统计
        stream_options={"include_usage": True},
    )
    usage = None
    for chunk in stream:
        if chunk.usage:                 # 用量信息常在最后一个 chunk
            usage = chunk.usage
        if chunk.choices and chunk.choices[0].delta.content:
            yield {"delta": chunk.choices[0].delta.content}
    # 流结束后,把 usage 记进日志,用于计费和监控
    if usage:
        yield {"usage": {"total_tokens": usage.total_tokens}}

还有几个坑值得点一下。其一,流式连接是长连接,要设置合理的超时——既不能太短(否则正常的长回答会被掐断),也要有个上限(防止异常连接永久挂着占资源)。其二,前端按"打字机效果"渲染时,如果每收到一个字就触发一次完整的 Markdown 重新渲染,长回答会越来越卡;可以做节流,攒一小批或隔几十毫秒渲染一次。其三,SSE 是单向的(只能服务端推、客户端收),如果你的场景需要双向实时通信,要考虑 WebSocket。下面把流式和非流式的关键差异集中对照一下:

流式 vs 非流式 对照

  维度            非流式               流式
  --------------------------------------------------------------
  首字延迟        等全部生成完          一秒内就能看到开头
  体验            长时间空白等待        打字机式逐步呈现
  错误处理        要么全成功要么全失败  可能在中途失败 要发错误事件
  取消能力        发射后不管 无法取消    可中途中断 停止计费
  实现复杂度      低 发请求等响应        高 要打通端到端的流
  适合场景        短输出 或要完整解析    给用户看的长文本回答

  口诀:给用户看的对话用流式,
        给程序解析的结构化数据按需选,别为流式而流式。

这一节这几个坑,串起来是同一个意思:流式不是"后端加个 stream=True"就完事的开关,它是一条贯穿模型、后端、代理、前端的链路,链路上任何一环没配合好,流式体验就会打折甚至失效。代理的缓冲会把流重新攒成坨;用量统计的方式变了;长连接的超时要重设;前端的渲染要做节流。这些坑没有一个在"调模型"这一步,它们散落在链路的各个环节。把流式当成一个端到端的系统工程,而不是一个 API 参数——上线前,从模型到用户的屏幕,一段一段地确认数据真的在"流"——这样它才能在你的产品里,稳定地给用户那种"回答正在涌现"的顺滑体验。

关键概念速查

概念 说明
逐 token 生成 大模型一次产出一个 token,长回答的生成要持续好几秒
非流式调用 等模型生成完全部内容,一次性返回完整响应
流式调用 模型每生成一小段就立刻推送,边生成边传递
SSE Server-Sent Events,基于 HTTP 让服务端持续推送数据给前端
首字延迟 从发起请求到用户看到第一个字的时间,流式下大幅缩短
中途错误 流式下错误可能发生在已推送部分内容之后,需发错误事件
AbortController 前端中止 fetch 请求的机制,用于实现取消生成
连接断开感知 后端检测前端是否已断开,据此停止模型调用、停止计费
代理缓冲 Nginx 等默认会缓冲响应,须关闭否则流式被攒成坨
流式用量统计 token 用量常在流末尾的特殊片段,需专门收集用于计费

避坑清单

  1. 不要把调用大模型当成普通接口:它的生成过程长达数秒,非流式会让用户空等。
  2. 不要用非流式做长回答对话:已生成的内容被压着不发,用户面对空白屏幕。
  3. 不要只在后端加 stream=True:流式是端到端链路,前端不读流照样是空等。
  4. 不要用 HTTP 状态码表达流式中途错误:响应头已发出,只能在流里发错误事件。
  5. 不要忽视"半成功"状态:前端要区分一字未出就失败和中途断,处理方式不同。
  6. 不要只做前端取消:前端断开后,后端不停止模型调用,token 会继续白烧。
  7. 不要不分场景硬上流式:给程序解析的 JSON 该拼完再解析,别上增量解析复杂度。
  8. 不要忘了关代理缓冲:Nginx 默认缓冲会把流攒成坨,须 proxy_buffering off。
  9. 不要漏掉流式下的用量统计:token 用量在流末尾,要专门收集用于计费监控。
  10. 不要让前端每个字都重渲染:长回答的 Markdown 全量重渲会越来越卡,要节流。

总结

回头看第一版那个"等生成完再返回"的对话接口,它的错误很典型。它不在某一行代码,而在一个对大模型调用的根本误解:以为调模型和调一个普通后端接口一样,发请求、等响应、拿结果。真相是,大模型是逐 token 生成的,一个长回答的生成过程要持续好几秒——这是一个有时间长度的"过程",而非流式调用把这个过程压成了一个"点",代价就是用户面对空白屏幕的全部等待。流式做的,就是把这个过程重新展开,让模型产出的每一小段,实时地流到用户眼前。

而把流式做扎实,工程量并不小。它不是加一个 stream=True 参数那么简单,而是要用 SSE 打通模型到前端的整条链路,要重新设计能表达"中途失败"的错误处理,要做到前端能取消、后端能感知并停止计费,要分清流式和结构化输出各自的诉求,还要关掉代理缓冲、收集流式用量、设好长连接超时、给前端渲染做节流。一套真正顺滑的流式对话,是这些环节一个不少地拼起来的。

这件事其实很像在餐厅吃一顿多道菜的晚餐。非流式调用,就像你点完菜,后厨把前菜、汤、主菜、甜点全部做好,摆满一整个托盘,才一次性端到你面前——在那之前,你对着空桌子,饿着,也不知道厨房到底在不在忙。流式调用,则是这道菜好了先上这道,你一边吃前菜,后厨一边做主菜,全程你的桌上都有东西、都有进展。同样是吃完整顿饭,后一种方式里,你从不需要对着空桌子干等。而取消能力,就是你吃到一半饱了,可以叫住服务员说"后面的菜不用做了"——非流式那一整托盘早就做好了,想省也省不下;流式里没做的菜,是真的可以不做、不收钱的。

这类问题还有一个共同的麻烦:它在开发和测试时很难暴露。你自己测,问的都是"翻译一句话""今天星期几"这种短问题,模型一秒不到就生成完了,非流式和流式根本看不出区别——你会觉得"等完整响应"这套挺好。真正会把这个问题撑开的,是上线后真实用户问的那些需要长篇回答的复杂问题:模型要生成七八秒,非流式下用户对着空白屏幕的七八秒,足以让一大批人以为系统卡死、刷新或离开。所以如果你正在做一个 AI 对话功能,别等用户抱怨"点了发送半天没反应",才回头想起流式。在写下第一个调用模型的接口时,就把它当成一个"消费持续数据流"的任务来设计,而不是一个"等一个响应"的普通接口——把模型的输出当成流,这是这篇文章最想留给你的一句话。

—— 别看了 · 2026
声明:本站所有文章,如无特殊说明或标注,均为本站原创发布。任何个人或组织,在未征得本站同意时,禁止复制、盗用、采集、发布本站内容到任何网站、书籍等各类媒体平台。如若本站内容侵犯了原著者的合法权益,可联系我们进行处理 邮箱1846861578@qq.com。
技术教程

数据库事务隔离级别完全指南:从一次"开了事务库存还是被超卖"看懂脏读幻读到底怎么挡

2026-5-22 20:39:18

技术教程

数据库读写分离完全指南:从一次"用户发完评论刷新就不见了"看懂主从复制延迟为什么坑人

2026-5-22 20:50:52

0 条回复 A文章作者 M管理员
    暂无讨论,说说你的看法吧
个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索