2024 年我给一个产品接大模型的流式对话:用户问一句,答案像打字机一样一个字一个字蹦出来。前端怎么接这个流?这件事我压根没多想。第一版我做得很顺手:fetch 拿到响应,用 response.body 的 reader 一段一段读,每读到一段就解码成字符串,按空行 \n\n 切成几个 SSE 事件,每个事件去掉 data: 前缀,JSON.parse 一下,把里面的增量文字拼到界面上。就完事了。本地测一测——真不错:字一个个往外冒,丝般顺滑,我连着问了七八个问题,每次都好。我心里很踏实:"流式嘛,不就是网络一段段返回,我一段段解析、一段段显示?"可等这东西真正上线、面对长回答和真实网络,一串问题冒了出来。第一种最先把我打懵:控制台偶尔飘红——Unexpected end of JSON input,JSON.parse 炸了,可它不是每次都炸,十次里偶尔崩一两次,大部分时候好好的。第二种最难缠:用户反馈答案中间莫名少了一段,前言不搭后语,我盯着代码怎么看都看不出哪丢了。第三种最头疼:本地几乎不出错,可一部署到生产、请求过了一层 Nginx 反向代理,出错率直接飙上去,有时候字还会一大坨一大坨地卡着出,根本不"流式"了。第四种最莫名其妙:流到最后,总有一个 data: [DONE],我的 JSON.parse 一遇到它必定抛错。我盯着这一连串问题想了很久才彻底想明白,第一版错在一个根本的认知上:我以为"网络一次返回一个完整的 SSE 事件,我按 \n\n 切开,每段就是一个完整事件"。这句话把"网络分段的边界"和"SSE 事件的边界"当成了同一个东西。可这两件事毫无关系。我脑子里,后端发一个事件、网络就传一个事件过来,reader 每次 read 拿到的,正好是一个或几个完整的事件。可 HTTP 流式响应根本不是这样工作的。后端确实是一个事件一个事件往外写的,可这些字节流到网络上,会被 TCP、被中间的代理、被浏览器,按它们自己的缘由切成一个个 chunk——这个 chunk 切在哪里,纯粹由网络的缓冲、拥塞、MTU 这些底层因素决定,跟你的 SSE 事件边界没有半点关系。一次 reader.read() 拿到的那段数据,可能正好是一个完整事件,也可能是半个事件,可能是一个半事件,可能是三个事件拼在一起,还可能把一个事件从某个 JSON 的引号中间劈成两半。我那个第一版,假设每个 chunk 都是"齐整的事件",于是只要网络在某个事件中间切了一刀——这在长回答里几乎必然发生——我就会把半个 JSON 字符串喂给 JSON.parse,它当然报 Unexpected end of JSON input;而被切走的另外半个,在下一个 chunk 里以"半个事件开头"的形式到达,又对不上 data: 前缀,被我直接丢弃——这就是答案中间"少了一段"的真相。问题从来不是某个事件坏了,而是我根本没有把跨 chunk 的事件重新拼回去。真正接好流式响应,核心不是"一段段读、一段段解析",而是承认 chunk 边界和事件边界是两回事:你必须维护一个缓冲区,把每次读到的字节不断追加进去,然后只从缓冲区里切出那些"已经完整"的事件去解析,把末尾那段还不完整的残块留在缓冲区里,等下一个 chunk 来把它补全。这篇文章就把 LLM 流式响应的 SSE 解析这个坑梳理一遍:为什么"按 \n\n 切 chunk"是错的、SSE 协议到底长什么样、怎么用缓冲区做增量解析、TextDecoder 那个多字节字符被切断的暗坑、解析出事件后怎么处理 [DONE] 和错误事件,以及 Nginx 缓冲、超时、中断这些把流式做扎实要避开的坑。
问题背景
这个坑普遍,是因为"读一段、处理一段"这个直觉,在处理普通响应时一直是对的——你 fetch 一个 JSON 接口,等它读完、一次性 JSON.parse,从不出错。可流式响应是一类特殊的数据:它的"逻辑单元"(SSE 事件)和它的"传输单元"(网络 chunk)是两套独立的切分,而第一版代码错误地把它们当成了一套。它错得隐蔽,是因为短回答几乎测不出来:回答只有几十个字时,后端写得快、整个响应可能就在一两个 chunk 里发完,事件边界恰好没被切中,于是本地怎么测都对。它只在长回答 + 真实网络(尤其经过代理)时才暴露——回答越长、chunk 越多,某个 chunk 切中事件中间的概率就越接近 100%。
把这个现象拆开,错误认知和真相是这样对应的:
- 现象:
JSON.parse概率性报Unexpected end of JSON input;答案中间莫名少一段;上线过 Nginx 后错误率飙升、字还卡成大块;结束标记[DONE]必定让解析抛错。 - 错误认知一:以为一次
read()拿到的就是一个或几个完整 SSE 事件。真相是 chunk 边界由网络底层决定,可以切在事件的任意位置,包括 JSON 中间。 - 错误认知二:以为按
\n\n切开每段就能直接解析。真相是末尾那段往往是不完整的残块,必须留到下个 chunk 拼接,不能丢、也不能立即解析。 - 错误认知三:以为
data:后面跟的永远是 JSON。真相是[DONE]这类结束标记不是 JSON,流里还可能夹着错误事件,都要单独识别。 - 真相:流式解析必须有一个跨 chunk 的缓冲区。读到的字节先追加进缓冲区,再从中切出完整事件,残块留存等待补全——这是流式协议解析的通用骨架。
一、为什么"按 \n\n 切 chunk"是错的
先把第一版那个解析逻辑摆出来。它的思路就是字面意思:读一个 chunk,按空行切,逐个解析。
// 第一版:每来一个 chunk,按 \n\n 切开,每段当成一个完整 SSE 事件(反面教材)
async function streamChat(messages) {
const res = await fetch('/api/chat', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ messages, stream: true }),
});
const reader = res.body.getReader();
const decoder = new TextDecoder();
while (true) {
const { value, done } = await reader.read();
if (done) break;
const text = decoder.decode(value); // 把这个 chunk 解码成字符串
for (const part of text.split('\n\n')) { // 按空行切事件
if (!part.startsWith('data: ')) continue;
const json = JSON.parse(part.slice(6)); // 偶尔在这里炸:Unexpected end of JSON input
appendToUI(json.choices[0].delta.content);
}
}
}
这段代码在短回答里跑得好好的。它的问题,要看清楚一个事实才显形:后端按事件写出去的字节流,到了网络上会被重新切分。后端确实是一个事件一个事件 write 的,但 reader.read() 每次返回的那个 chunk,边界在哪里,是 TCP 分段、操作系统缓冲、中间代理、浏览器接收缓冲这些因素共同决定的——它完全可能切在一个事件的正中间。下面是同一串 SSE 数据,被网络切成 chunk 后的真实样子:
后端按事件写出的完整数据:
data: {"choices":[{"delta":{"content":"今"}}]}\n\ndata: {"choices":[{"delta":{"content":"天"}}]}\n\n
网络实际切出来的 chunk(边界由网络决定,跟事件边界无关):
chunk 1 = data: {"choices":[{"delta":{"content":"今"}}]}\n\ndata: {"choi
chunk 2 = ces":[{"delta":{"content":"天"}}]}\n\n
看 chunk 1 的结尾:data: {"choi——这是第二个事件被劈开的前半截。第一版代码对 chunk 1 按 \n\n 切,切出两段:第一段是完整的第一个事件,正常;第二段是 data: {"choi,它以 data: 开头,通过了 startsWith 检查,于是 JSON.parse('{"choi') ——抛错。再看 chunk 2,它以 ces":[... 开头,这是事件的后半截,不以 data: 开头,被 continue 直接跳过——那个被劈开的事件,前半截解析失败、后半截被丢弃,整个事件就这么没了。这就同时解释了第一个怪现象(概率性 JSON.parse 报错)和第二个怪现象(答案中间少一段):它们是同一件事——网络切中了事件中间——的两个面。
这里要建立的第一个、也是最重要的认知是:在流式响应里,存在两套完全独立的"边界",你必须在脑子里把它们彻底分开。一套是"事件边界",由 SSE 协议定义,以 \n\n 为标志,它是后端按业务逻辑切出来的、有意义的单元。另一套是"chunk 边界",由网络传输决定,每次 reader.read() 返回一个 chunk,它的切分位置纯粹是 TCP 分段、缓冲区大小、代理转发这些底层机制的副产物,毫无业务含义。第一版代码最根本的错,就是默认这两套边界是重合的——以为网络会贴心地按事件边界来切 chunk。可网络根本不知道、也不关心你的 SSE 事件长什么样,它眼里只有字节流,它会在它觉得方便的任何字节位置切一刀。一旦你接受"chunk 边界是随机的、可以落在事件的任意位置,包括一个 JSON 字符串的引号中间",你就会明白:你绝不能拿单个 chunk 直接做解析,因为单个 chunk 在语法上根本不保证是完整的。你需要的是一个能"跨 chunk"工作的东西,把网络随意切碎的字节重新攒起来,再按你自己的事件边界去切。这个东西就是缓冲区。理解了这层,后面的解法就是水到渠成。
二、SSE 协议到底长什么样
要正确解析,得先看清楚要解析的东西。大模型流式接口用的是 SSE(Server-Sent Events,服务器发送事件)这套格式,它非常朴素,就是一段纯文本,规则只有几条:每个事件由一行或多行组成,以 字段名: 开头;事件与事件之间用一个空行(也就是 \n\n)分隔。大模型场景里用得最多的字段就是 data。一段典型的流式响应长这样:
data: {"choices":[{"delta":{"content":"今"}}]}
data: {"choices":[{"delta":{"content":"天"}}]}
data: {"choices":[{"delta":{"content":"天气不错"}}]}
data: [DONE]
几个要点必须看清。第一,每个 data: 后面跟的,是这一小步的增量内容(delta),不是完整答案——你要做的是把所有 delta 按顺序拼起来。第二,事件之间靠空行分隔,这个空行就是你切分事件的唯一依据。第三,也是最容易踩的——流的最后会有一个 data: [DONE],它是一个结束标记,[DONE] 这四个字符不是 JSON,你拿它去 JSON.parse 必定抛错。第四,SSE 还允许注释行(以 : 开头)和其他字段(如 event:、id:),解析时不认识的行直接忽略即可。
这里要建立的认知是:SSE 不是某个大模型厂商的私有发明,它是一个有正式规范的、浏览器原生支持的协议,你解析它时,是在解析一个标准,而不是在猜某个接口的格式。这个认知有两个实际好处。一是它让你知道哪些东西是"协议保证"的、可以放心依赖:事件一定以空行分隔、字段一定是"名字加冒号"的形式——这些是规范钉死的,你的解析器可以稳稳建立在它们之上。二是它让你知道哪些东西是"约定"而非"协议":比如 data: 里装的是 JSON、结束标记叫 [DONE],这些是大模型接口这一层的约定,不同厂商可能有细微差别(有的没有 [DONE]、有的多一个 event: 字段),你的代码要对这些约定保持一点弹性。把"协议层"和"约定层"分开看,你写出来的解析器就既稳——牢牢扎在协议上,又韧——能容下不同厂商的小差异。下一节就把这个解析器写出来。
三、用缓冲区做增量解析
解法的核心,是一个跨越所有 chunk 始终存在的缓冲区(buffer)。它的工作方式是一个固定的循环:每来一个 chunk,把它追加到缓冲区尾部;然后在缓冲区里反复找 \n\n,每找到一个,就说明从开头到这里是一个已经完整的事件,把它切出来去处理;一直切到再也找不到 \n\n——这时缓冲区里剩下的,正是那个"还没收全"的残块,原样留着,等下一个 chunk 来把它接上。
整个增量解析的流程是这样跑的:
把这个循环写成代码,核心就这么一小段。注意它每次只切出"分隔符之前"的部分,分隔符之后的一律留在缓冲区:
let buffer = ''; // 跨 chunk 累积的缓冲区,函数外长期存在
function feed(chunkText) {
buffer += chunkText; // 新数据先追加到缓冲区尾部
const events = [];
let idx;
// 只要缓冲区里还能找到事件分隔符 \n\n,就切出一个"完整"事件
while ((idx = buffer.indexOf('\n\n')) !== -1) {
const rawEvent = buffer.slice(0, idx);
buffer = buffer.slice(idx + 2); // 分隔符之后的部分,留在缓冲区里继续等
if (rawEvent) events.push(rawEvent);
}
// 循环结束时,buffer 里剩下的就是最后那个"还不完整"的事件残块
return events;
}
这段代码就是整个解法的心脏。它和第一版的差别,表面看只是多了一个 buffer 变量,实质却是两种完全不同的思路:第一版是"chunk 进来就地解析、解析完就扔",而它是"chunk 进来先攒着、只解析攒够了的部分、没攒够的继续攒"。一个事件哪怕被网络切成了三个 chunk,在这里也会被 buffer += 重新粘成一整条,直到那个 \n\n 出现才被切出去——它在跨 chunk 这件事上,是天然正确的。
这里要建立的认知是:缓冲区增量解析,不是一个"流式响应专用的技巧",它是处理一切"逻辑单元和传输单元不一致"的数据流的通用骨架——你在 TCP 粘包拆包、在解析分块的日志、在读一个超大文件的按行处理里,会一遍遍遇到同一个骨架。它的结构永远是这三步:一,把新到的数据追加进一个长期存在的缓冲区;二,在缓冲区里尽可能多地切出"完整的逻辑单元"去处理;三,把切不出的、不完整的尾巴留在缓冲区里,等下一批数据。这个骨架之所以正确,是因为它把一个棘手的问题做了干净的拆分:"数据有没有收全"这件事,被完全收敛到了缓冲区这一个地方来回答——能切出分隔符,就是收全了;切不出,就是没收全。你的解析逻辑因此可以建立在一个奢侈的前提上:我拿到手的每一个 rawEvent,都保证是完整的。第一版的乱,本质就是它没有这个"完整性检查点",每个 chunk 都被当成完整的来对待,而它们根本不是。一旦你脑子里有了这个三步骨架,你看任何流式数据都会本能地先问:它的逻辑单元是什么、分隔符是什么、缓冲区放在哪——想清楚这三个,解析器就已经写好一半了。
四、TextDecoder 的暗坑:多字节字符也会被切断
上面解决了"事件被切断",可还有一个藏得更深的切断问题:一个汉字,也会被 chunk 切成两半。reader.read() 返回的 value 不是字符串,是一段字节(Uint8Array)。一个汉字在 UTF-8 里占 3 个字节,而 chunk 边界是按字节切的——它完全可能切在某个汉字的第 1 个字节和第 2 个字节之间。如果你对每个 chunk 单独 decode,那半个汉字就会被解码成一个乱码 �。
// 错误:不带 stream 选项,多字节字符被 chunk 切断时会解码出乱码 �
const decoder = new TextDecoder();
const part = decoder.decode(chunk); // 一个汉字的 3 字节若跨 chunk,这里就乱了
// 正确:带 stream: true,解码器会把不完整的多字节序列暂存到下一次
const decoder2 = new TextDecoder('utf-8');
const part2 = decoder2.decode(chunk, { stream: true }); // 残缺字节自动留存待补全
关键就是 decode 的第二个参数 { stream: true }。带上它,TextDecoder 就会变成一个"有状态"的解码器:当一个 chunk 的末尾是半个汉字,它不会急着把那半个字节解码成乱码,而是把这几个残缺字节暂存在解码器内部,等你下次调用 decode 时,把它们和新 chunk 开头的字节拼起来,补成完整的汉字再解码。注意一个前提:要享受这个能力,你必须从头到尾复用同一个 TextDecoder 实例——它的"暂存状态"是存在实例里的,每个 chunk 都 new 一个新的,就前功尽弃了。
这里要建立的认知是:"被切断"这件事,在流式处理里是分层发生的,你得一层一层地把它接住。最外面一层是字节层——网络按字节切 chunk,于是一个多字节字符(汉字、emoji)会被劈开,这一层由 TextDecoder 的 stream: true 负责接住,它在"字节拼成字符"这个粒度上做缓冲。里面一层是事件层——字符流里,一个 SSE 事件会被劈开,这一层由你自己写的那个 buffer 负责接住,它在"字符拼成事件"这个粒度上做缓冲。这是两个不同粒度、各管一段的缓冲,缺了任何一个都会出问题:只有 buffer 没有 stream: true,你的事件边界是对的,但事件内容里会零星出现乱码 �;只有 stream: true 没有 buffer,你的每个字符都对,但事件还是会被从中间劈开。第一版代码两个都没有,所以两种错都会犯。把这个"分层接住"的图景记在心里:正确的流式管线,是字节先经过解码器的缓冲拼成完整字符,完整字符再经过你的缓冲区拼成完整事件,完整事件才轮到解析——每一层都在自己的粒度上,把下层随机的切断给抹平。下一节就把这两层缝进一个完整的函数里。
五、解析出事件之后:拼接、[DONE] 与错误事件
把第三、第四节的两层缓冲缝在一起,就是一个完整、正确的流读取函数。它一边用复用的 decoder 接住字节切断,一边用 buffer 接住事件切断:
async function readStream(res, onDelta, onDone, onError) {
const reader = res.body.getReader();
const decoder = new TextDecoder('utf-8'); // 整条流复用同一个实例
let buffer = '';
while (true) {
const { value, done } = await reader.read();
if (done) break;
// stream: true 接住跨 chunk 的多字节字符;再追加进事件缓冲区
buffer += decoder.decode(value, { stream: true });
let idx;
while ((idx = buffer.indexOf('\n\n')) !== -1) {
const rawEvent = buffer.slice(0, idx);
buffer = buffer.slice(idx + 2);
handleEvent(rawEvent, onDelta, onDone, onError);
}
}
// 流结束:缓冲区里可能还剩最后一个事件(末尾没补 \n\n)
if (buffer.trim()) handleEvent(buffer, onDelta, onDone, onError);
}
真正的"业务判断"集中在 handleEvent 里。它要做三件事:从可能多行的事件里取出 data: 行;识别出 [DONE] 这个非 JSON 的结束标记;识别出流里夹带的错误事件。这三件事都做了,才轮到 JSON.parse:
function handleEvent(rawEvent, onDelta, onDone, onError) {
// 一个事件可能有多行,只取以 data: 开头的行,去掉前缀
const payload = rawEvent
.split('\n')
.filter((line) => line.startsWith('data:'))
.map((line) => line.slice(5).trim())
.join('');
if (!payload) return; // 注释行、event: 行等,直接忽略
// 关键:[DONE] 是结束标记,不是 JSON,绝不能丢给 JSON.parse
if (payload === '[DONE]') {
onDone();
return;
}
let json;
try {
json = JSON.parse(payload);
} catch (e) {
// 走到这里通常意味着上游格式异常,而不是 chunk 没收全
// (没收全的事件根本切不出 \n\n,不会进到这里)
console.warn('跳过一个无法解析的事件:', payload);
return;
}
// 流式响应里也可能夹一个错误事件,要单独识别,别当成正常 delta
if (json.error) {
onError(json.error.message || '上游返回错误');
return;
}
const delta = json.choices?.[0]?.delta?.content;
if (delta) onDelta(delta); // 真正的增量内容,交出去拼接
}
调用方拿到 onDelta、onDone、onError 三个回调,各司其职:增量来了就拼接渲染,[DONE] 来了就收尾,错误来了就提示:
let answer = '';
await readStream(
res,
(delta) => { // onDelta:每来一段增量,拼接并渲染
answer += delta;
renderMarkdown(answer);
},
() => { // onDone:收到 [DONE],流正常结束
console.log('回答完成,共', answer.length, '字');
},
(message) => { // onError:流中夹带了错误事件
showError(message);
},
);
这里要建立的认知是:经过缓冲区切出来的 rawEvent,它在"语法结构"上是完整的——但"语法完整"不等于"内容就是一段可以直接拼的正常 JSON",这两件事必须分开。缓冲区解决的是"这个事件收全了没有",而 handleEvent 解决的是另一个层面的问题:"这个收全了的事件,到底是什么"。它可能是一段正常的增量内容,可能是 [DONE] 这个结束信号,可能是一个 error 错误事件,也可能是一行无意义的注释。第一版代码的另一个错,就是它默认"凡是 data: 后面的东西都是可以拼的 JSON 增量"——这个假设太天真,[DONE] 一来就破。正确的姿势是:对每一个完整事件,先做"分类"再做"处理"——先判断它是结束标记吗、是错误吗,排除掉这些特殊情况,剩下的才按正常 JSON 增量来解析。还有一个值得记住的细节:把这三种结果设计成 onDelta / onDone / onError 三个独立回调,而不是糊成一个,会让调用方的逻辑非常清爽——它不用自己去判断"这次是不是结束了",解析器已经替它分好了类。解析器负责"读懂流",调用方负责"用好结果",职责一清,代码就不乱。
六、工程里那些 SSE 的坑
解析逻辑对了,流式还有几个工程上反复咬人的坑。第一个,也是最坑的——Nginx 默认会缓冲响应,把你的流式效果彻底毁掉。这正是第三个怪现象(上线过 Nginx 后字卡成大块):反向代理默认会把上游响应攒到一定量或攒完才转发给浏览器,你后端一个字一个字写,到了用户那里却是一大坨一起到。必须在代理层显式关掉缓冲:
# Nginx 反代流式接口:必须关掉缓冲,否则 SSE 被攒成大块才下发
location /api/chat {
proxy_pass http://backend;
proxy_http_version 1.1;
# 关键:不缓冲上游响应,chunk 一到就立刻转发给浏览器
proxy_buffering off;
proxy_cache off;
# 流式回答耗时长,把读超时调大,别让长回答中途被掐断
proxy_read_timeout 300s;
# 部分环境还需这个响应头来禁用代理层缓冲
add_header X-Accel-Buffering no;
}
第二个,用户要能中断。长回答生成到一半,用户点"停止",或者切走了页面、组件卸载了——这条流必须能真正掐断,否则它会在后台继续跑、继续烧 token。靠 AbortController:
// 用户点"停止生成",或组件卸载时,要能真正中断这条流
const controller = new AbortController();
const res = await fetch('/api/chat', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ messages, stream: true }),
signal: controller.signal, // 把中断信号挂到请求上
});
stopButton.onclick = () => controller.abort(); // 任意时刻可中断
try {
await readStream(res, onDelta, onDone, onError);
} catch (e) {
if (e.name === 'AbortError') return; // 用户主动停止,属正常,不报错
throw e;
}
第三个坑在后端:SSE 的响应头和格式有硬性要求。Content-Type 必须是 text/event-stream,要尽早把响应头发出去,每个事件之间必须用 \n\n 分隔,最后别忘了补 [DONE]。一个 Node 端的转发示例:
// 后端:用 SSE 把上游大模型的流式响应转发给浏览器
app.post('/api/chat', async (req, res) => {
res.setHeader('Content-Type', 'text/event-stream');
res.setHeader('Cache-Control', 'no-cache');
res.setHeader('Connection', 'keep-alive');
res.flushHeaders(); // 立刻把响应头发出,别等 body
const upstream = await callLLM(req.body.messages);
for await (const chunk of upstream) {
// 每个事件之间用空行分隔,这是 SSE 的硬性格式
res.write(`data: ${JSON.stringify(chunk)}\n\n`);
}
res.write('data: [DONE]\n\n'); // 收尾标记
res.end();
});
第四个,别忽略流被中途掐断的情况。网络抖动、后端崩了、超时——流可能在没发 [DONE] 时就结束了。所以不能只靠"收到 [DONE]"来判断成功,还要在 readStream 的 while 跳出后检查:如果一个 delta 都没收到、又没收到 [DONE],这就是一次失败的流,要让用户知道、并提供重试。
这里要建立的认知是:一条流式响应,经过的是一整条"管道"——你的后端、Nginx 或别的反向代理、CDN、浏览器——而这条管道上的每一个环节,默认行为往往都是"为普通请求优化"的,而普通请求的优化方向,常常恰好和流式的诉求相反。普通请求追求吞吐,所以代理默认开缓冲、攒着批量发——这对流式是灾难。普通请求时间短,所以超时默认设得不长——这会掐断长回答。普通请求要么成功要么失败一目了然,所以你习惯了不处理"中途断"——而流式天然就有"发了一半断掉"这个中间态。这就是为什么流式功能"本地全对、上线就出问题":本地没有那条代理管道,你的请求是直连后端的,管道上每个环节的默认行为根本没机会作妖。所以做流式,你的视野不能只停在"前端解析对不对"这一个点上,你得把整条管道摊开来逐段检查:后端有没有按 SSE 格式正确地写、有没有及时 flush;代理层 proxy_buffering 关了没、读超时够不够长;前端有没有处理中断、有没有处理"没等到 [DONE] 流就断了"。流式响应的健壮,从来不是某一段代码写对就行,而是这一整条管道、每一个环节都为"数据要一点一点、实时地流过去"这件事让过路——任何一个环节没让,整条流式体验就垮在那里。
关键概念速查
| 概念 | 说明 | 关键点 |
|---|---|---|
| chunk 边界 | 每次 reader.read() 返回的数据段边界 | 由网络底层决定,可切在事件任意位置 |
| 事件边界 | SSE 事件之间的分隔,标志是 \n\n | 由后端按业务切出,与 chunk 边界无关 |
| 缓冲区增量解析 | 追加新数据 切出完整事件 残块留存 | 处理一切流式数据的通用三步骨架 |
| data: 前缀 | SSE 事件里承载内容的字段 | 大模型场景里装的是增量 delta 的 JSON |
| [DONE] 标记 | 流正常结束的收尾事件 | 不是 JSON,必须在 JSON.parse 前单独识别 |
| 错误事件 | 流中夹带的 error 字段事件 | 要单独识别,不能当成正常 delta 拼接 |
| TextDecoder stream | decode 带 stream: true 选项 | 接住跨 chunk 的多字节字符,须复用实例 |
| proxy_buffering | Nginx 是否缓冲上游响应 | 流式接口必须 off,否则字会卡成大块 |
| AbortController | 中断 fetch 与正在进行的流读取 | 用户停止或组件卸载时须能真正掐断 |
| 流中途断开 | 未收到 [DONE] 流就结束 | 不能只靠 [DONE] 判成功,要处理中间态 |
避坑清单
- 不要假设一次 read() 拿到的是完整 SSE 事件。chunk 边界由网络决定,可切在事件中间,甚至切在 JSON 引号之间。
- 必须维护一个跨 chunk 的缓冲区。新数据追加进去,只切出含 \n\n 的完整事件,残块留着等下一个 chunk。
- 不要直接对单个 chunk 做 JSON.parse。半个事件喂进去就是 Unexpected end of JSON input。
- TextDecoder 要带 stream: true 并复用同一个实例,否则跨 chunk 的汉字、emoji 会被解码成乱码 �。
- [DONE] 不是 JSON。解析前先单独识别结束标记,否则它必定让 JSON.parse 抛错。
- 流里可能夹错误事件。识别 error 字段并单独处理,别把错误内容当成正常答案拼上去。
- Nginx 等反向代理必须关 proxy_buffering。不关的话流式效果消失,字会攒成一大块才到。
- 代理读超时要调大。长回答生成时间长,默认超时会把流中途掐断。
- 用 AbortController 支持中断。用户停止、组件卸载时要真正断流,并把 AbortError 当正常情况处理。
- 处理流中途断开。不能只靠收到 [DONE] 判成功,没等到 [DONE] 就结束要算失败并支持重试。
总结
回头看,第一版栽的跟头,根子是一个认知误判:我以为网络会按 SSE 事件的边界把数据切成 chunk,一次 read() 拿到的就是齐整的一个或几个事件。可流式响应里有两套毫不相干的边界——事件边界是后端按业务切的、以 \n\n 为标志;chunk 边界是网络按它自己的缓冲和分段切的,落在哪里纯属随机。我把这两套边界当成了一套,于是只要网络在某个事件中间切一刀,我就会拿半个 JSON 去解析、把另外半个丢掉。问题从来不在"某个事件坏了",而在我根本没有把被网络切碎的事件重新拼回去的机制。
真正接好流式响应,工作量不在"把解析写得多巧",而在一次观念的转变:承认传输单元和逻辑单元是两回事,于是必须有一个缓冲区,在它们之间做翻译。一旦接受这一点,该做的事就都浮现出来了——维护一个跨 chunk 的缓冲区只切完整事件、用 TextDecoder 的 stream 选项接住被切断的多字节字符、把 [DONE] 和错误事件在解析前先分类出来、在代理层关掉缓冲、用 AbortController 支持中断。每一步都不复杂,难的是先承认:你拿到的每一个 chunk,在语法上都不保证是完整的,你不能直接信任它。
我后来常拿"隔着断断续续的电话听人报一串数字"来想这件事。对方一位一位地念,中间用停顿表示"这个数报完了"。可电话信号会卡顿,它卡在哪儿,跟对方报到第几位毫无关系——可能一句话还没说完信号就断了,可能两个数字连着一口气过来。如果你每次信号通的时候,就把听到的那截当成一个完整数字记下来,你记的全是错的。正确的做法是:听到什么先记在一张草稿纸上,只有当你听到那个表示"报完了"的停顿,才从草稿纸上把这一个完整的数字撕下来。那张草稿纸,就是缓冲区;那个停顿,就是 \n\n。信号怎么断,都不影响你最终把每个数字攒齐。
这类问题最咬人的地方,在于它在本地、在短回答时几乎永远是"对"的:回答短,整个响应一两个 chunk 就发完了,网络恰好没切中任何事件的中间,你怎么测都顺,功能演示一切完美。它只在真实网络下、回答变长、chunk 变多、再经过一层代理时才露出獠牙——而那时,概率性的解析报错和悄悄丢失的内容,已经发到用户面前了。所以别等线上零星报错才想起 chunk 边界:做流式功能的第一天,就该把"传输单元不等于逻辑单元"刻进设计里——先搭好缓冲区、先想清楚事件边界、先把 [DONE] 和错误事件的分类写对。把这件事在写第一行流式解析代码时就想清楚,你才算真正跳出了那个人人都会写、却在长回答里悄悄出错的"读一段、解析一段"。
—— 别看了 · 2026