2023 年我做一个数据处理任务,要给几万条数据——商品评论、工单记录、用户反馈——每一条都调一次大模型做分类和摘要。批量跑这件事,我压根没多想。第一版我做得很省事:批量跑一批 LLM 请求,不就是写个 for 循环,挨条调 API、把结果攒进一个列表,跑完一次性存下来?本地开发时——真不错:我拿几十条数据测,for 循环唰唰跑完,结果整整齐齐存进文件,几行代码搞定。我心里很踏实:"批量嘛,不就是循环挨个调?"可等这个任务真正拿去跑那几万条真实数据,一串问题冒了出来。第一种最先把我打懵:任务跑到第 4000 多条时,进程崩了——可能是网络抖动、可能是机器休眠——我一看结果文件空空如也,前面那 4000 多条辛辛苦苦调出来的结果,一条没留下,等于那一笔 API 钱白烧了。第二种最难缠:中途撞上一次限流,API 抛了个异常,整个 for 循环当场终止,后面还有一万多条根本没跑。第三种最头疼:崩了之后我想接着跑,可我根本不知道哪些已经跑过了——从头再跑一遍,前面那些等于又花一遍钱;手动去数着跳过,又怕数错、漏跑。第四种最莫名其妙:几万条里有那么几条,内容触发了模型报错,就这几条坏数据,把整个任务反复带崩——本来 99% 都能成功的作业,因为那 1%,一条结果都产不出来。我盯着这一连串问题想了很久才彻底想明白,第一版错在一个根本的认知上:我以为"批量跑一批 LLM 请求,就是写个 for 循环挨个调"。这句话把"批量推理"和我平时写的那种普通 for 循环,当成了同一种东西。可它们不是。一个普通的 for 循环,默认了一件根本不成立的事——它默认这批活儿会从头到尾、一次性顺顺利利地跑完。在内存里循环个几万次、做点纯计算,这个默认基本成立。可一批几万次的大模型 API 调用,根本不是这样:它是一个要跑很久的、必然会中途出岔子的长作业。几万次网络请求里,撞上限流、超时、网络抖动、进程被杀、机器休眠——出几次意外是必然的,不是万一。而一个 for 循环面对中途失败,只有两种结局:要么异常一冒,整个循环当场终止,后面的全没跑;要么你用 try except 把异常吞掉,可这条的结果就这么丢了。更要命的是,循环本身从不记录"我已经跑到哪儿了"——它一旦中断,重启就只能从第一条重新开始,前面成功的全部推倒重来、重新花钱。我第一版所有的麻烦,根上都是同一件事:我用一个"假设全程顺利"的循环,去干一件"中途必然出岔子"的长作业。真正做对批量推理,核心不是"写个 for 循环挨个调",而是把它当一个"会中途失败的长作业"来设计:每条结果即时落盘、重启时跳过已完成的、单条失败被隔离而不拖垮全局、失败的条目单独归档可重试,再配上并发控制和成本核算。这篇文章就把 AI 批量推理梳理一遍:为什么"循环挨个调"是错的、结果为什么要即时落盘、怎么做幂等续跑、单条失败怎么隔离、并发怎么控,以及失败重试、成本核算、结果校验、可观测这些把批量作业真正做扎实要避开的坑。
问题背景
先把那串问题的现象和我的误判讲清楚,后面所有的设计都是冲着纠正这个误判去的。
现象:一套"循环挨个调、跑完一次性存"的批量推理,在真正跑几万条数据时冒出一串问题:进程跑到第 4000 多条崩了,前面的结果一条没留下;中途撞一次限流,整个循环当场终止,后面一万多条没跑;想接着跑,却不知道哪些已经跑过,从头重跑就是重新花钱;几万条里几条坏数据,把整个作业反复带崩。
我当时的错误认知:"批量跑一批 LLM 请求,就是写个 for 循环,挨条调 API、攒进列表、跑完一次性存。"
真相:这个认知错在它把"批量推理"想象成了一段"短小、可靠、原子"的计算。在我脑子里,这个 for 循环就像在内存里给一个列表里的每个数字加一——它很快、不会失败、要么整段做完要么整段没做。可几万次大模型 API 调用,三个属性一个都不占。它不快:几万次网络往返,要跑几小时甚至几天。它会失败:每一次调用都是一次网络请求,要经过限流、超时、服务端抖动,几万次里出意外是必然。它不原子:它是几万个独立的小操作,完全可能"做成了三千个、剩下的没做"。一个要跑几小时、必然会中途失败、而且可以部分完成的东西,它不是一段计算,而是一个作业(job)。开头那四个问题,根上全是"拿对待计算的方式去对待作业":崩了结果全丢,是因为结果攒在内存里、跑完才落盘,可作业根本撑不到"跑完";限流终止全局,是因为一条的异常被允许掀翻整个循环;不知道跑到哪,是因为循环不记录进度、没有断点;几条坏数据带崩全局,是因为没把"单条的失败"和"作业的失败"分开。问题的根子清楚了:这不是"循环写得不够快"的小毛病,而是要换一个根本的认知——几万次 LLM 调用是一个长作业,做对它,就是要让这个作业可以随时中断、随时从断点接着跑、单条失败不影响其余。
要把批量推理做对,需要几块认知:
- 为什么"循环挨个调"是错的——它是个会中途失败的长作业,不是一段计算;
- 即时落盘——每处理完一条,结果立刻写下来,绝不攒在内存里;
- 幂等续跑——重启时读出已完成的,跳过它们,只跑没跑过的;
- 单条失败隔离——一条出错只算这一条的事,不许掀翻整个作业;
- 并发与限流——用并发提速,但并发数要压在 API 限流之下;
- 失败重试、成本核算、结果校验、可观测这些工程坑怎么处理。
一、为什么"写个 for 循环挨个调"是错的
先把这件最根本的事钉死:"循环挨个调"错在它脑子里有一幅错误的图景——它以为这几万次调用,会像内存里一个紧凑的循环那样,一口气、不出意外地跑到底。这幅图景对纯计算大致成立,对几万次网络 API 调用则彻底不成立。这里的关键差别,是"失败的概率"和"中断的代价"被同时放大了。失败概率上:一次内存计算几乎不可能失败,而一次 API 调用,要穿过你的网络、对方的网关、对方的限流器、对方的模型服务,任何一环抖一下它就失败——单次失败率哪怕只有千分之一,几万次累计下来,"整批一次都不失败"反而成了几乎不可能的事。中断代价上:纯计算循环跑得快,崩了重来一遍也就几秒;而批量推理跑几小时、还每一条都花了真金白银的 API 费用,跑到一半崩掉重来,损失的是几小时和一大笔钱。一个"高概率会中途失败、且中断代价极高"的东西,你绝不能用"假设它会一次性顺利跑完"的方式去写。可一个朴素的 for 循环,恰恰就是这个错误假设的化身。
下面这段代码,就是我那个"几十条测着没事、几万条一跑就崩"的第一版:
# 反面教材:把几万次 LLM 调用当成一个普通 for 循环
def run_batch(items):
results = []
for item in items: # 破绽 1:循环不记录"跑到哪了",崩了只能从头再来
answer = call_llm(item["text"]) # 破绽 2:一次限流/超时抛异常,整个循环当场终止
results.append(answer) # 破绽 3:结果全攒在内存里
save_all(results) # 破绽 4:只有循环完整跑完,这一行才执行 —— 没跑完=全丢
return results
这段代码在本地用几十条数据测时表现不错,因为几十次调用,跑得快、撞上失败的概率也低——它恰好一次性顺利跑完了,你看不出任何破绽。它的问题不在某一行语法上——for、append、save_all,语法都对——而在它把四个"必然会发生的事"全都赌成了"不会发生":它赌循环不会中断(破绽 1,没有断点),赌每一条都不会抛异常(破绽 2,一条炸了全局终止),赌内存里的结果能一直撑到最后(破绽 3),赌 save_all 那一行一定执行得到(破绽 4,没跑完就全丢)。几十条数据时,这四个赌注恰好都赢了;几万条数据、跑上几小时,它们必输无疑。问题的根子清楚了:做对批量推理,第一步不是把循环写得更快,而是承认"这是一个会中途失败的长作业",然后把上面那四个赌注,一个一个换成"失败了也不要紧"的设计。下面五节,就是这件事怎么落地。
二、即时落盘:每处理完一条立刻 checkpoint
四个破绽里,最致命的是"结果攒在内存里、跑完才落盘"——它意味着只要作业没能撑到最后一行,之前所有的成果和花掉的钱就一起清零。所以第一条规矩就是:每处理完一条,它的结果就立刻写到磁盘上去。用 JSONL(每行一个 JSON)这种可追加的格式最合适:
import json
def append_result(path, item_id, output):
"""每处理完一条,立刻追加写一行 —— 进程下一秒被杀,已经写下的也不丢。"""
with open(path, "a", encoding="utf-8") as f: # "a" 追加模式:只往后加,不动旧内容
line = json.dumps({"id": item_id, "output": output}, ensure_ascii=False)
f.write(line + "\n") # 一条结果就是独立的一行
这里的认知要点是:"即时落盘"这条规矩,本质上是在给一个长作业不断地设"存档点(checkpoint)"。它要对抗的,是一个朴素循环最脆弱的假设——"我一定能跑到最后那行 save_all"。这个假设一旦被打破(而对长作业来说它必然被打破),攒在内存里的一切就随进程一起蒸发。即时落盘把这个"全有或全无"的赌局,拆成了几万个"已落袋的就再也丢不了"的小确定:第三千条刚写完磁盘,进程第三千零一条就被杀,你也稳稳保住了三千条成果。这里有两个细节值得说。第一,为什么用 JSONL、为什么用追加模式。批量作业的结果是一条一条产出的,追加写(模式 "a")每次只在文件末尾加一行,既不需要把整个文件读进来再整体重写,也不会因为写到一半崩溃就毁掉前面已写好的内容——它天然适合"边产出边落盘"。而 JSONL 每行一个独立 JSON,坏了某一行也不影响其余行能被解析,比"把几万条塞进一个大 JSON 数组"健壮得多。第二,要明白即时落盘换来的是什么——它换来的不是"作业不会失败",作业照样会失败;它换来的是"失败的代价从'全部'降到了'最后没存下的那一条'"。失败这件事你阻止不了,但你能让每一次失败都只损失极小的一点点。把这一点想通,你就理解了所有长作业容错设计的出发点:不是追求不失败,而是让失败变得廉价。结果即时落盘了,可这些落在磁盘上的"存档",得真的能在重启后被用起来——这是下一节。
三、幂等续跑:重启时跳过已完成的
即时落盘,只解决了"已完成的不丢";它还没解决"重启之后,怎么接着上次的进度往下跑"。如果重启还是从第一条开始,那前面落盘的几千条就要被重新调用、重新花钱。所以第二条规矩是幂等续跑:重启时,先把已经完成的 id 读出来:
def load_done(path):
"""读出结果文件里已经完成的 id 集合 —— 这是"续跑"的唯一依据。"""
done = set()
try:
with open(path, encoding="utf-8") as f:
for line in f:
if line.strip(): # 跳过可能存在的空行
done.add(json.loads(line)["id"])
except FileNotFoundError:
pass # 第一次跑,结果文件还不存在,正常
return done
有了"已完成集合",主循环就该先查再做——已经做过的,直接跳过:
def run_batch(items, result_path="results.jsonl", error_path="errors.jsonl"):
"""批量作业主循环:跳过已完成、逐条落盘、失败单独归档。"""
done = load_done(result_path) # 启动先读断点:哪些已经成功了
total, skipped, failed = len(items), 0, 0
for i, item in enumerate(items, 1):
if item["id"] in done: # 续跑核心:做过的直接跳,不重复花钱
skipped += 1
continue
r = process_one(item) # 单条处理(下一节细讲)
if r["ok"]:
append_result(result_path, r["id"], r["output"])
else:
append_result(error_path, r["id"], r["error"]) # 失败的进 errors 文件
failed += 1
print(f"进度 {i}/{total} 已跳过 {skipped} 失败 {failed}")
return total, skipped, failed
这里的认知要点是:"幂等续跑"这个词,落到操作上就是一句话:让这个批量作业,无论被中断多少次、重启多少次,最终的效果都和"一次性顺利跑完"完全一样。"幂等"的意思就是"重复执行不会产生额外的副作用"——对批量推理来说,额外的副作用就是"对同一条数据重复调用、重复付费"。要做到幂等,关键是作业得有"记忆":它得知道"哪些已经做过了"。而这个"记忆",不该是另外维护一个进度文件、记一个"我跑到第几条"的计数器——那种计数器很脆弱,它假设数据顺序永远不变,还可能在崩溃的瞬间没来得及更新而和实际进度对不上。真正可靠的"记忆",就是结果文件本身:results.jsonl 里已经躺着的那些 id,就是"已完成"这件事最权威、最不会骗人的证据——因为一个 id 出现在那里,当且仅当它的结果真的被成功写下来了。load_done 做的,就是把这份"证据"读成一个集合,主循环再拿每一条的 id 去集合里一查,就知道该不该跳过。这套"结果文件即进度"的设计还附带了一个好处:它对数据顺序、对你怎么切分批次,统统不敏感——哪怕下次重启时输入数据的顺序变了、甚至你把一大批拆成几小批分别跑,只要 id 不变,已完成的就一定会被正确跳过。一句话:不要去额外记一个"我跑到哪了"的进度,要让"已经产出的结果"自己来回答"哪些做过了"。能跳过已完成的了,可还有个破绽没堵——一条数据出错,凭什么不让它掀翻整个作业?这是下一节。
四、单条失败隔离:别让一条坏数据带崩整个作业
前面主循环里调了一个 process_one,它就是单条失败隔离的关键。规矩是:处理单独一条数据时,无论成功还是失败,都要把结果包成一个"结构化的结果",绝不让异常抛到外层的主循环里去:
def process_one(item):
"""处理单条:成功失败都包成结构化结果,绝不把异常抛到外层主循环。"""
try:
output = call_llm(item["text"])
return {"id": item["id"], "ok": True, "output": output}
except Exception as e:
# 关键:一条出错,只是这一条的事 —— 捕获下来、标记成 ok=False,别让它掀翻整个作业
return {"id": item["id"], "ok": False, "error": repr(e)}
下面这张图,把一条数据在批量作业里走的整条路画出来:
这里的认知要点是:单条失败隔离,要害是在脑子里把两种"失败"彻底分开——"一条数据处理失败"和"整个作业失败",它们是完全不同量级的两件事,绝不能让前者自动升级成后者。一个朴素的 for 循环最危险的地方就在这里:循环体里任何一条抛出未被捕获的异常,异常会顺着调用栈一路冒上去,把整个循环掀翻——于是"第 4001 条这一条数据有问题"这种小事,直接变成了"后面一万多条全部没跑"这种大事故。process_one 这个函数做的,就是在每一条数据的外面套一个"防爆罩":它内部 try except 把一切异常都接住,无论这一条是成功是失败,它对外永远只返回一个规规矩矩的字典——成功就是 ok=True 带着 output,失败就是 ok=False 带着 error。这样一来,外层主循环收到的永远是"数据",而不是"异常",主循环就再也不会被任何一条掀翻了。这里有个观念要扭过来:在批量作业里,"某些条目失败"不是异常,而是正常——几万条真实数据里,总会有内容过长的、格式古怪的、触发模型安全拦截的。你要做的不是"消灭失败",而是"接住失败、记录失败、隔离失败"。所以失败的条目也要即时落盘——把它的 id 和错误原因写进一个单独的 errors 文件。这个 errors 文件有双重价值:一来它让失败的那条数据本身也不会丢,二来它天然就是一份"待重试清单",下一节的重试,靠的就是它。一句话:让单条的失败,永远停在单条;一个 99% 能成功的作业,就该稳稳交付那 99%。作业能正确、能续跑、能隔离失败了,可几万条一条条串着跑还是太慢——下一节讲并发提速。
五、并发与限流:批量作业的速度与安全
一条一条串行地调,几万次网络往返要跑很久。LLM 调用大部分时间花在等网络响应上,正适合用并发——同时发出多个请求。但并发数必须有个上限,这个上限要压在 API 的限流阈值之下:
from concurrent.futures import ThreadPoolExecutor, as_completed
def run_batch_concurrent(items, result_path="results.jsonl",
error_path="errors.jsonl", workers=8):
"""并发版:用线程池提速,但并发数 workers 要压在 API 限流之下。"""
done = load_done(result_path)
todo = [it for it in items if it["id"] not in done] # 续跑:只把没做过的提交进去
with ThreadPoolExecutor(max_workers=workers) as pool: # workers 就是并发上限
futures = {pool.submit(process_one, it): it for it in todo}
for fut in as_completed(futures): # 哪条先回来就先处理哪条
r = fut.result()
if r["ok"]:
append_result(result_path, r["id"], r["output"])
else:
append_result(error_path, r["id"], r["error"])
这里的认知要点是:并发这件事,在批量推理里要同时握住两头——一头是"提速",一头是"别把自己提到限流里去"。先说为什么并发能提速,而且提速幅度很大。一次 LLM 调用,你的程序绝大部分时间不是在算什么,而是在干等——等请求穿过网络、等对方的模型生成完、等响应再穿回来。串行地跑,意味着这些"干等"的时间一段接一段地累加;而并发地跑,意味着这一条在等的时候,另外七条也在同时等——等待的时间被叠在了一起,总耗时于是被压缩了几倍。所以 LLM 批量调用是并发的绝佳场景。但并发数绝不是越大越好,这就是另一头——限流。任何一个 LLM API 都有限流:每分钟多少次请求、每分钟多少 token,是有明确上限的。你把并发开到一百,看似很猛,实际上大概率会瞬间撞破对方的限流阈值,于是对方开始大批大批地拒绝你的请求、返回 429 错误——你不但没更快,反而因为海量请求失败,变得更慢、更乱。所以 workers 这个并发数,要设成一个"能把你的吞吐顶到接近限流线、但不越过它"的值,它取决于你这个 API key 的具体配额,需要你去查、去测。这里还有一个容易被忽略的点:即时落盘和续跑那套设计,在并发下不能丢。你看 run_batch_concurrent 里,提交任务前依然先 load_done 过滤了一遍,每个任务回来依然是立刻 append 落盘——并发只是改变了"任务怎么被调度执行",它没有、也不该改变"每条结果都立刻落盘、重启都能跳过已完成"这个根基。一句话:用并发把等待时间叠起来换速度,但要把并发数死死压在 API 限流线之下。主干都齐了,最后是几个把批量推理真正用到生产里才会撞见的工程坑。
六、工程坑:失败重试、成本核算、结果校验、可观测
主干之外,还有几个工程坑,不处理就会让你的批量作业在边角上出问题。坑 1:失败的条目要能单独重试,而不是整批重来。上一节那个 errors.jsonl,记下了所有失败的 id——它天然就是一份"待重试清单"。重试时,只把这些失败的挑出来再跑一遍:
def retry_errors(items, error_path="errors.jsonl", result_path="results.jsonl"):
"""重试:只把上一轮失败的条目挑出来再跑一遍,而不是整批重来。"""
failed_ids = load_done(error_path) # errors 文件里记的就是失败的 id
retry_items = [it for it in items if it["id"] in failed_ids]
open(error_path, "w").close() # 清空旧错误档,这一轮重新记
for item in retry_items:
r = process_one(item)
if r["ok"]:
append_result(result_path, r["id"], r["output"])
else:
append_result(error_path, r["id"], r["error"]) # 还失败的,留待下一轮
坑 2:成本要实时核算,别等账单来了才知道。批量作业一跑就是几万次调用,每一次都在花钱。要在跑的过程中,按 token 用量实时累加估算花费,心里有数:
def estimate_cost(usage, price_in_per_1k, price_out_per_1k):
"""按 token 用量估算花费 —— 批量作业必须一边跑一边盯着成本。"""
# usage 形如 {"prompt_tokens": 1200, "completion_tokens": 300}
cost_in = usage["prompt_tokens"] / 1000 * price_in_per_1k # 输入 token 的钱
cost_out = usage["completion_tokens"] / 1000 * price_out_per_1k # 输出 token 的钱
return cost_in + cost_out
坑 3:模型"答了"不等于"答对了格式",结果要校验。如果你要求模型输出 JSON,那解析失败、缺字段,都该算这一条失败——别把一条格式不对的输出当成功存进去:
def process_one_validated(item):
"""处理单条并校验输出 —— 模型"答了"不等于"答对了格式"。"""
try:
raw = call_llm(item["text"])
data = json.loads(raw) # 要求输出 JSON:解析失败就算这条失败
if "label" not in data: # 再校验必需字段在不在
raise ValueError("模型输出缺少 label 字段")
return {"id": item["id"], "ok": True, "output": data}
except Exception as e:
return {"id": item["id"], "ok": False, "error": repr(e)}
坑 4:作业跑完要打一份"账",成败一目了然。几万条跑完,你得立刻知道成了多少、败了多少、还差多少,而不是去肉眼数文件:
def report(total, result_path="results.jsonl", error_path="errors.jsonl"):
"""作业跑完打一份账:成了多少、败了多少、还差多少。"""
ok = len(load_done(result_path))
bad = len(load_done(error_path))
print(f"总数 {total} | 成功 {ok} | 失败 {bad} | 未处理 {total - ok - bad}")
if bad:
print(f"有 {bad} 条失败,详见 {error_path},可用 retry_errors 单独重跑")
坑 5:重试要带退避,别对着限流硬撞。失败里很大一部分是限流(429)和超时——这类失败,立刻原样重试只会继续撞墙。重试前要等一会儿,而且每重试一次等得更久(指数退避),给对方的限流器留出恢复的时间。坑 6:输入数据的 id 必须稳定且唯一。整套续跑都建立在"用 id 判断这条做没做过"之上——如果 id 是按行号临时生成的,那下次输入顺序一变,id 全错位,续跑就全乱了。id 要来自数据本身、稳定不变(比如数据库主键、内容哈希)。坑 7:别把超大输入闷头塞给模型。几万条真实数据里,总有那么几条长得离谱——直接塞过去要么超出上下文窗口报错、要么烧掉惊人的 token。在调用之前就该按长度筛一道:过长的截断,或单独挑出来另作处理。坑 8:大批量作业,结果别只写一个文件。几十万条结果全堆进一个 jsonl,文件会大到难以处理、load_done 每次启动都要全文件扫一遍。量大时要按批次分文件(比如每一万条一个文件),或把"已完成 id"单独存成一份索引。
关键概念速查
| 概念 / 手段 | 说明 |
|---|---|
| 作业不是循环 | 几万次 LLM 调用是会中途失败的长作业,非一段计算 |
| 循环挨个调的错 | 赌全程不中断、不抛错、内存撑到底,长作业必输 |
| 即时落盘 | 每条结果立刻追加写盘,失败代价从"全部"降到"一条" |
| JSONL 追加格式 | 每行一个 JSON,可追加、坏一行不毁全文件 |
| 幂等续跑 | 重启读已完成 id,跳过做过的,效果等同一次跑完 |
| 结果文件即进度 | 已落盘的 id 就是最权威的进度,不另记计数器 |
| 单条失败隔离 | 异常包成结构化结果,一条失败不掀翻整个作业 |
| errors 即重试清单 | 失败条目落进单独文件,天然是待重试清单 |
| 并发压在限流下 | 并发叠起等待时间提速,但并发数不得越过限流线 |
| 成本实时核算 | 按 token 用量边跑边估花费,别等账单才知道 |
避坑清单
- 把几万次 LLM 调用当"会中途失败的长作业",不是一段顺跑的循环。
- 每处理完一条,结果立刻追加落盘,绝不把结果攒在内存里等跑完。
- 结果用 JSONL 每行一条,可追加、坏一行不影响其余行。
- 重启时先读已完成的 id 集合,跳过做过的,只跑没跑过的。
- 用结果文件本身当进度依据,别另记一个脆弱的"跑到第几条"。
- 单条处理用 try except 包住,异常包成结构化结果,不抛给主循环。
- 失败条目连同错误原因写进单独的 errors 文件,作为待重试清单。
- 用并发提速,但并发数压在 API 限流阈值之下,别撞出大批 429。
- 重试要带指数退避,边跑边按 token 核算成本,跑完打一份成败账。
- 输入 id 要稳定唯一,超长输入先筛一道,结果量大时按批次分文件。
总结
回头看那串"跑到第 4000 条崩了结果全丢、限流终止整批没跑、不知道跑到哪、几条坏数据带崩全局"的问题,以及我后来在批量推理上接连踩的坑,最该记住的不是某一个函数的写法,而是我动手前那个想当然的判断——"批量跑一批 LLM 请求,就是写个 for 循环挨个调"。这句话错在它把"批量推理"和一段普通的内存循环,当成了同一种东西。我以为循环挨个调、攒进列表、跑完一存,这件事就办成了。可我忽略了一件最要紧的事:几万次大模型 API 调用,根本不是一段"短小、可靠、原子"的计算,而是一个要跑几小时、必然会中途失败、而且可以部分完成的长作业。一个朴素的 for 循环,骨子里赌的是"全程顺利跑完";而一个长作业,中途出岔子不是万一,是必然。这个错配,本地用几十条数据测时根本看不出来——几十次调用跑得快、撞上失败的概率又低,它恰好把那个"全程顺利"的赌注赢了下来;它只会在真正跑那几万条、跑上几小时的时候,以"崩在半路、成果归零"的方式爆出来。
所以做对批量推理,真正的功夫不在"把循环写得更快"那几行上。循环本身不难。真正的功夫,在于你要从一开始就承认"这是一个会中途失败的长作业",然后把那个朴素循环的每一个赌注,都换成"失败了也不要紧"的设计:你不能赌内存撑到最后,就每处理完一条立刻把结果落盘;你不能赌作业一次跑完,就让它能读着断点、跳过已完成的接着跑;你不能赌每一条都不出错,就把单条的失败用 try except 隔离起来、单独归档;你想用并发提速,就把并发数死死压在 API 限流线之下;而到了失败重试、成本核算、结果校验这些边角上,你还要处处守住,别让作业又在某个角落功亏一篑。这篇文章的几节,其实就是顺着这套规矩展开的:先想清楚"循环挨个调"为什么错,再讲即时落盘、幂等续跑、单条失败隔离、并发与限流,最后是重试、成本、校验、可观测这几个把批量作业守扎实的工程细节。
你会发现,批量推理这件事,和现实里"一个人怎么寄出一万封信"完全相通。一个不靠谱的人会怎么寄?他把一万封信一股脑全抱在怀里,挨家挨户地塞邮筒,塞一封从怀里抽一封。塞到第四千封时,他脚下一滑摔了一跤,怀里剩下的六千封连同记不清的那四千封,全撒了一地、混作一团——他根本说不清哪些已经寄出去了、哪些还没,只能把这一万封从头再寄一遍,前四千封的邮票钱白花了;更糟的是,中间有几封地址写得有问题,他在那几封上卡住、急得团团转,结果剩下九千多封好信也跟着耽搁了。而一个靠谱的人怎么做?他每寄出去一封,就立刻在一个本子上记下这封的编号(这就是即时落盘);万一中途摔了、第二天接着干,他先翻一遍本子,凡是记过的就直接跳过(这就是幂等续跑);遇到地址有问题的那几封,他不在那儿干耗,而是把它们单独挑进一个小袋子,贴上"待核对",先把其余九千多封好信寄完,回头再单独处理那一小袋(这就是单条失败隔离);他还找来几个帮手一起跑,但人数不会多到把小小的邮局柜台挤爆(这就是并发压在限流之下)。同样是寄一万封信,不靠谱的人赌自己一趟不会摔、每封都没问题,靠谱的人则一开始就认定"必然会摔、必然有几封有问题",于是每一步都为"摔了之后还能接着干"做好了准备——差别不在"寄信这件事本身难不难",只在寄信的人心里有没有"这是个会中途出岔子的长差事"这根弦。
最后想说,批量推理做没做对,差距永远不会在"本地拿几十条数据测一测"时暴露——本地几十次调用,跑得飞快、又几乎不会撞上失败,你那个朴素的 for 循环恰好一次性顺顺利利跑完了,结果整整齐齐落进文件,你自然觉得"批量嘛,循环挨个调"一点问题都没有。它只在真实的、要跑几万条几小时、会撞限流会被中断的生产作业里才显形。那时候它会用最难堪的方式给你结账:做不好,你会因为进程崩在半路,眼睁睁看着几千条结果和那一笔 API 费用一起归零,会因为一次限流,让整个作业当场终止、后面一万多条根本没跑,会因为几条坏数据,让一个本该 99% 成功的作业一条结果都产不出来;而做对了,你的作业每跑完一条就稳稳落袋一条,被中断多少次都能从断点接着跑,几条坏数据被干净利落地隔离进重试清单,几万条数据无论跑多久、断多少回,最终都精确无误地交付出来。所以别等"跑到一半崩了、成果归零"那一刻找上门,在你写下批量调用的第一个 for 时就该想清楚:这里每条结果落盘了吗、崩了能续跑吗、一条失败会不会带崩全局、并发压在限流下了吗、成本我盯着吗,这一道道关口,我是不是都替这个长作业守住了?这些问题有了答案,你交付的才不只是一套"本地几十条看着对"的代码,而是一个无论数据有多少、作业被打断多少回,每一条都不丢、不重、不漏的、让人放心的批量系统。
—— 别看了 · 2026