AI 批量推理完全指南:从一次"跑到第 4000 条崩了、前面结果全丢了"看懂断点续传与失败隔离

2023 年我做一个数据处理任务要给几万条数据每一条都调一次大模型做分类和摘要批量跑这件事我压根没多想第一版我做得很省事批量跑一批 LLM 请求不就是写个 for 循环挨条调 API 把结果攒进一个列表跑完一次性存下来本地开发时真不错我拿几十条数据测 for 循环唰唰跑完结果整整齐齐存进文件几行代码搞定我心里很踏实可等这个任务真正拿去跑那几万条真实数据一串问题冒了出来第一种最先把我打懵任务跑到第 4000 多条时进程崩了我一看结果文件空空如也前面那 4000 多条辛辛苦苦调出来的结果一条没留下等于那一笔 API 钱白烧了第二种最难缠中途撞上一次限流 API 抛了个异常整个 for 循环当场终止后面还有一万多条根本没跑第三种最头疼崩了之后我想接着跑可我根本不知道哪些已经跑过了从头再跑一遍前面那些等于又花一遍钱手动去数着跳过又怕数错漏跑第四种最莫名其妙几万条里有那么几条内容触发了模型报错就这几条坏数据把整个任务反复带崩本来 99 都能成功的作业因为那 1 一条结果都产不出来我盯着这一连串问题想了很久才彻底想明白第一版错在一个根本的认知上我以为批量跑一批 LLM 请求就是写个 for 循环挨个调这句话把批量推理和我平时写的那种普通 for 循环当成了同一种东西可它们不是一个普通的 for 循环默认了一件根本不成立的事它默认这批活儿会从头到尾一次性顺顺利利地跑完可一批几万次的大模型 API 调用根本不是这样它是一个要跑很久的必然会中途出岔子的长作业几万次网络请求里撞上限流超时网络抖动进程被杀机器休眠出几次意外是必然的不是万一而一个 for 循环面对中途失败只有两种结局要么异常一冒整个循环当场终止要么你用 try except 把异常吞掉可这条的结果就这么丢了更要命的是循环本身从不记录我已经跑到哪儿了它一旦中断重启就只能从第一条重新开始真正做对批量推理核心不是写个 for 循环挨个调而是把它当一个会中途失败的长作业来设计每条结果即时落盘重启时跳过已完成的单条失败被隔离而不拖垮全局失败的条目单独归档可重试再配上并发控制和成本核算本文从头梳理为什么循环挨个调是错的结果为什么要即时落盘怎么做幂等续跑单条失败怎么隔离并发怎么控以及失败重试成本核算结果校验可观测这些把批量作业真正做扎实要避开的坑

2023 年我做一个数据处理任务,要给几万条数据——商品评论、工单记录、用户反馈——每一条都调一次大模型做分类和摘要。批量跑这件事,我压根没多想。第一版我做得很省事:批量跑一批 LLM 请求,不就是写个 for 循环,挨条调 API、把结果攒进一个列表,跑完一次性存下来?本地开发时——真不错:我拿几十条数据测,for 循环唰唰跑完,结果整整齐齐存进文件,几行代码搞定。我心里很踏实:"批量嘛,不就是循环挨个调?"可等这个任务真正拿去跑那几万条真实数据,一串问题冒了出来。第一种最先把我打懵:任务跑到第 4000 多条时,进程崩了——可能是网络抖动、可能是机器休眠——我一看结果文件空空如也,前面那 4000 多条辛辛苦苦调出来的结果,一条没留下,等于那一笔 API 钱白烧了。第二种最难缠:中途撞上一次限流,API 抛了个异常,整个 for 循环当场终止,后面还有一万多条根本没跑。第三种最头疼:崩了之后我想接着跑,可我根本不知道哪些已经跑过了——从头再跑一遍,前面那些等于又花一遍钱;手动去数着跳过,又怕数错、漏跑。第四种最莫名其妙:几万条里有那么几条,内容触发了模型报错,就这几条坏数据,把整个任务反复带崩——本来 99% 都能成功的作业,因为那 1%,一条结果都产不出来。我盯着这一连串问题想了很久才彻底想明白,第一版错在一个根本的认知上:我以为"批量跑一批 LLM 请求,就是写个 for 循环挨个调"。这句话把"批量推理"和我平时写的那种普通 for 循环,当成了同一种东西。可它们不是一个普通的 for 循环,默认了一件根本不成立的事——它默认这批活儿会从头到尾、一次性顺顺利利地跑完。在内存里循环个几万次、做点纯计算,这个默认基本成立。可一批几万次的大模型 API 调用,根本不是这样:它是一个要跑很久的、必然会中途出岔子的长作业。几万次网络请求里,撞上限流、超时、网络抖动、进程被杀、机器休眠——出几次意外是必然的,不是万一。而一个 for 循环面对中途失败,只有两种结局:要么异常一冒,整个循环当场终止,后面的全没跑;要么你用 try except 把异常吞掉,可这条的结果就这么丢了。更要命的是,循环本身从不记录"我已经跑到哪儿了"——它一旦中断,重启就只能从第一条重新开始,前面成功的全部推倒重来、重新花钱。我第一版所有的麻烦,根上都是同一件事:我用一个"假设全程顺利"的循环,去干一件"中途必然出岔子"的长作业。真正做对批量推理,核心不是"写个 for 循环挨个调",而是把它当一个"会中途失败的长作业"来设计:每条结果即时落盘、重启时跳过已完成的、单条失败被隔离而不拖垮全局、失败的条目单独归档可重试,再配上并发控制和成本核算。这篇文章就把 AI 批量推理梳理一遍:为什么"循环挨个调"是错的、结果为什么要即时落盘、怎么做幂等续跑、单条失败怎么隔离、并发怎么控,以及失败重试、成本核算、结果校验、可观测这些把批量作业真正做扎实要避开的坑。

问题背景

先把那串问题的现象和我的误判讲清楚,后面所有的设计都是冲着纠正这个误判去的。

现象:一套"循环挨个调、跑完一次性存"的批量推理,在真正跑几万条数据时冒出一串问题:进程跑到第 4000 多条崩了,前面的结果一条没留下;中途撞一次限流,整个循环当场终止,后面一万多条没跑;想接着跑,却不知道哪些已经跑过,从头重跑就是重新花钱;几万条里几条坏数据,把整个作业反复带崩

我当时的错误认知:"批量跑一批 LLM 请求,就是写个 for 循环,挨条调 API、攒进列表、跑完一次性存。"

真相:这个认知错在它把"批量推理"想象成了一段"短小、可靠、原子"的计算。在我脑子里,这个 for 循环就像在内存里给一个列表里的每个数字加一——它很快、不会失败、要么整段做完要么整段没做。可几万次大模型 API 调用,三个属性一个都不占。它不快:几万次网络往返,要跑几小时甚至几天。它会失败:每一次调用都是一次网络请求,要经过限流、超时、服务端抖动,几万次里出意外是必然。它不原子:它是几万个独立的小操作,完全可能"做成了三千个、剩下的没做"。一个要跑几小时、必然会中途失败、而且可以部分完成的东西,它不是一段计算,而是一个作业(job)。开头那四个问题,根上全是"拿对待计算的方式去对待作业":崩了结果全丢,是因为结果攒在内存里、跑完才落盘,可作业根本撑不到"跑完";限流终止全局,是因为一条的异常被允许掀翻整个循环;不知道跑到哪,是因为循环不记录进度、没有断点;几条坏数据带崩全局,是因为没把"单条的失败"和"作业的失败"分开。问题的根子清楚了:这不是"循环写得不够快"的小毛病,而是要换一个根本的认知——几万次 LLM 调用是一个长作业,做对它,就是要让这个作业可以随时中断、随时从断点接着跑、单条失败不影响其余。

要把批量推理做对,需要几块认知:

  • 为什么"循环挨个调"是错的——它是个会中途失败的长作业,不是一段计算;
  • 即时落盘——每处理完一条,结果立刻写下来,绝不攒在内存里;
  • 幂等续跑——重启时读出已完成的,跳过它们,只跑没跑过的;
  • 单条失败隔离——一条出错只算这一条的事,不许掀翻整个作业;
  • 并发与限流——用并发提速,但并发数要压在 API 限流之下;
  • 失败重试、成本核算、结果校验、可观测这些工程坑怎么处理。

一、为什么"写个 for 循环挨个调"是错的

先把这件最根本的事钉死:"循环挨个调"错在它脑子里有一幅错误的图景——它以为这几万次调用,会像内存里一个紧凑的循环那样,一口气、不出意外地跑到底。这幅图景对纯计算大致成立,对几万次网络 API 调用则彻底不成立。这里的关键差别,是"失败的概率"和"中断的代价"被同时放大了。失败概率上:一次内存计算几乎不可能失败,而一次 API 调用,要穿过你的网络、对方的网关、对方的限流器、对方的模型服务,任何一环抖一下它就失败——单次失败率哪怕只有千分之一,几万次累计下来,"整批一次都不失败"反而成了几乎不可能的事。中断代价上:纯计算循环跑得快,崩了重来一遍也就几秒;而批量推理跑几小时、还每一条都花了真金白银的 API 费用,跑到一半崩掉重来,损失的是几小时和一大笔钱。一个"高概率会中途失败、且中断代价极高"的东西,你绝不能用"假设它会一次性顺利跑完"的方式去写。可一个朴素的 for 循环,恰恰就是这个错误假设的化身。

下面这段代码,就是我那个"几十条测着没事、几万条一跑就崩"的第一版:

# 反面教材:把几万次 LLM 调用当成一个普通 for 循环
def run_batch(items):
    results = []
    for item in items:                   # 破绽 1:循环不记录"跑到哪了",崩了只能从头再来
        answer = call_llm(item["text"])   # 破绽 2:一次限流/超时抛异常,整个循环当场终止
        results.append(answer)            # 破绽 3:结果全攒在内存里
    save_all(results)                     # 破绽 4:只有循环完整跑完,这一行才执行 —— 没跑完=全丢
    return results

这段代码在本地用几十条数据测时表现不错,因为几十次调用,跑得快、撞上失败的概率也低——它恰好一次性顺利跑完了,你看不出任何破绽。它的问题不在某一行语法上——forappendsave_all,语法都对——而在它把四个"必然会发生的事"全都赌成了"不会发生":它赌循环不会中断(破绽 1,没有断点),赌每一条都不会抛异常(破绽 2,一条炸了全局终止),赌内存里的结果能一直撑到最后(破绽 3),赌 save_all 那一行一定执行得到(破绽 4,没跑完就全丢)。几十条数据时,这四个赌注恰好都赢了;几万条数据、跑上几小时,它们必输无疑。问题的根子清楚了:做对批量推理,第一步不是把循环写得更快,而是承认"这是一个会中途失败的长作业",然后把上面那四个赌注,一个一个换成"失败了也不要紧"的设计。下面五节,就是这件事怎么落地。

二、即时落盘:每处理完一条立刻 checkpoint

四个破绽里,最致命的是"结果攒在内存里、跑完才落盘"——它意味着只要作业没能撑到最后一行,之前所有的成果和花掉的钱就一起清零。所以第一条规矩就是:每处理完一条,它的结果就立刻写到磁盘上去。用 JSONL(每行一个 JSON)这种可追加的格式最合适:

import json

def append_result(path, item_id, output):
    """每处理完一条,立刻追加写一行 —— 进程下一秒被杀,已经写下的也不丢。"""
    with open(path, "a", encoding="utf-8") as f:        # "a" 追加模式:只往后加,不动旧内容
        line = json.dumps({"id": item_id, "output": output}, ensure_ascii=False)
        f.write(line + "\n")                            # 一条结果就是独立的一行

这里的认知要点是:"即时落盘"这条规矩,本质上是在给一个长作业不断地设"存档点(checkpoint)"。它要对抗的,是一个朴素循环最脆弱的假设——"我一定能跑到最后那行 save_all"。这个假设一旦被打破(而对长作业来说它必然被打破),攒在内存里的一切就随进程一起蒸发。即时落盘把这个"全有或全无"的赌局,拆成了几万个"已落袋的就再也丢不了"的小确定:第三千条刚写完磁盘,进程第三千零一条就被杀,你也稳稳保住了三千条成果。这里有两个细节值得说。第一,为什么用 JSONL、为什么用追加模式。批量作业的结果是一条一条产出的,追加写(模式 "a")每次只在文件末尾加一行,既不需要把整个文件读进来再整体重写,也不会因为写到一半崩溃就毁掉前面已写好的内容——它天然适合"边产出边落盘"。而 JSONL 每行一个独立 JSON,坏了某一行也不影响其余行能被解析,比"把几万条塞进一个大 JSON 数组"健壮得多。第二,要明白即时落盘换来的是什么——它换来的不是"作业不会失败",作业照样会失败;它换来的是"失败的代价从'全部'降到了'最后没存下的那一条'"。失败这件事你阻止不了,但你能让每一次失败都只损失极小的一点点。把这一点想通,你就理解了所有长作业容错设计的出发点:不是追求不失败,而是让失败变得廉价。结果即时落盘了,可这些落在磁盘上的"存档",得真的能在重启后被用起来——这是下一节。

三、幂等续跑:重启时跳过已完成的

即时落盘,只解决了"已完成的不丢";它还没解决"重启之后,怎么接着上次的进度往下跑"。如果重启还是从第一条开始,那前面落盘的几千条就要被重新调用、重新花钱。所以第二条规矩是幂等续跑:重启时,先把已经完成的 id 读出来:

def load_done(path):
    """读出结果文件里已经完成的 id 集合 —— 这是"续跑"的唯一依据。"""
    done = set()
    try:
        with open(path, encoding="utf-8") as f:
            for line in f:
                if line.strip():                       # 跳过可能存在的空行
                    done.add(json.loads(line)["id"])
    except FileNotFoundError:
        pass                                           # 第一次跑,结果文件还不存在,正常
    return done

有了"已完成集合",主循环就该先查再做——已经做过的,直接跳过:

def run_batch(items, result_path="results.jsonl", error_path="errors.jsonl"):
    """批量作业主循环:跳过已完成、逐条落盘、失败单独归档。"""
    done = load_done(result_path)                      # 启动先读断点:哪些已经成功了
    total, skipped, failed = len(items), 0, 0
    for i, item in enumerate(items, 1):
        if item["id"] in done:                         # 续跑核心:做过的直接跳,不重复花钱
            skipped += 1
            continue
        r = process_one(item)                          # 单条处理(下一节细讲)
        if r["ok"]:
            append_result(result_path, r["id"], r["output"])
        else:
            append_result(error_path, r["id"], r["error"])   # 失败的进 errors 文件
            failed += 1
        print(f"进度 {i}/{total} 已跳过 {skipped} 失败 {failed}")
    return total, skipped, failed

这里的认知要点是:"幂等续跑"这个词,落到操作上就是一句话:让这个批量作业,无论被中断多少次、重启多少次,最终的效果都和"一次性顺利跑完"完全一样。"幂等"的意思就是"重复执行不会产生额外的副作用"——对批量推理来说,额外的副作用就是"对同一条数据重复调用、重复付费"。要做到幂等,关键是作业得有"记忆":它得知道"哪些已经做过了"。而这个"记忆",不该是另外维护一个进度文件、记一个"我跑到第几条"的计数器——那种计数器很脆弱,它假设数据顺序永远不变,还可能在崩溃的瞬间没来得及更新而和实际进度对不上。真正可靠的"记忆",就是结果文件本身:results.jsonl 里已经躺着的那些 id,就是"已完成"这件事最权威、最不会骗人的证据——因为一个 id 出现在那里,当且仅当它的结果真的被成功写下来了。load_done 做的,就是把这份"证据"读成一个集合,主循环再拿每一条的 id 去集合里一查,就知道该不该跳过。这套"结果文件即进度"的设计还附带了一个好处:它对数据顺序、对你怎么切分批次,统统不敏感——哪怕下次重启时输入数据的顺序变了、甚至你把一大批拆成几小批分别跑,只要 id 不变,已完成的就一定会被正确跳过。一句话:不要去额外记一个"我跑到哪了"的进度,要让"已经产出的结果"自己来回答"哪些做过了"。能跳过已完成的了,可还有个破绽没堵——一条数据出错,凭什么不让它掀翻整个作业?这是下一节。

四、单条失败隔离:别让一条坏数据带崩整个作业

前面主循环里调了一个 process_one,它就是单条失败隔离的关键。规矩是:处理单独一条数据时,无论成功还是失败,都要把结果包成一个"结构化的结果",绝不让异常抛到外层的主循环里去:

def process_one(item):
    """处理单条:成功失败都包成结构化结果,绝不把异常抛到外层主循环。"""
    try:
        output = call_llm(item["text"])
        return {"id": item["id"], "ok": True, "output": output}
    except Exception as e:
        # 关键:一条出错,只是这一条的事 —— 捕获下来、标记成 ok=False,别让它掀翻整个作业
        return {"id": item["id"], "ok": False, "error": repr(e)}

下面这张图,把一条数据在批量作业里走的整条路画出来:

这里的认知要点是:单条失败隔离,要害是在脑子里把两种"失败"彻底分开——"一条数据处理失败"和"整个作业失败",它们是完全不同量级的两件事,绝不能让前者自动升级成后者。一个朴素的 for 循环最危险的地方就在这里:循环体里任何一条抛出未被捕获的异常,异常会顺着调用栈一路冒上去,把整个循环掀翻——于是"第 4001 条这一条数据有问题"这种小事,直接变成了"后面一万多条全部没跑"这种大事故。process_one 这个函数做的,就是在每一条数据的外面套一个"防爆罩":它内部 try except 把一切异常都接住,无论这一条是成功是失败,它对外永远只返回一个规规矩矩的字典——成功就是 ok=True 带着 output,失败就是 ok=False 带着 error。这样一来,外层主循环收到的永远是"数据",而不是"异常",主循环就再也不会被任何一条掀翻了。这里有个观念要扭过来:在批量作业里,"某些条目失败"不是异常,而是正常——几万条真实数据里,总会有内容过长的、格式古怪的、触发模型安全拦截的。你要做的不是"消灭失败",而是"接住失败、记录失败、隔离失败"。所以失败的条目也要即时落盘——把它的 id 和错误原因写进一个单独的 errors 文件。这个 errors 文件有双重价值:一来它让失败的那条数据本身也不会丢,二来它天然就是一份"待重试清单",下一节的重试,靠的就是它。一句话:让单条的失败,永远停在单条;一个 99% 能成功的作业,就该稳稳交付那 99%。作业能正确、能续跑、能隔离失败了,可几万条一条条串着跑还是太慢——下一节讲并发提速。

五、并发与限流:批量作业的速度与安全

一条一条串行地调,几万次网络往返要跑很久。LLM 调用大部分时间花在等网络响应上,正适合用并发——同时发出多个请求。但并发数必须有个上限,这个上限要压在 API 的限流阈值之下:

from concurrent.futures import ThreadPoolExecutor, as_completed

def run_batch_concurrent(items, result_path="results.jsonl",
                         error_path="errors.jsonl", workers=8):
    """并发版:用线程池提速,但并发数 workers 要压在 API 限流之下。"""
    done = load_done(result_path)
    todo = [it for it in items if it["id"] not in done]   # 续跑:只把没做过的提交进去
    with ThreadPoolExecutor(max_workers=workers) as pool:  # workers 就是并发上限
        futures = {pool.submit(process_one, it): it for it in todo}
        for fut in as_completed(futures):                  # 哪条先回来就先处理哪条
            r = fut.result()
            if r["ok"]:
                append_result(result_path, r["id"], r["output"])
            else:
                append_result(error_path, r["id"], r["error"])

这里的认知要点是:并发这件事,在批量推理里要同时握住两头——一头是"提速",一头是"别把自己提到限流里去"。先说为什么并发能提速,而且提速幅度很大。一次 LLM 调用,你的程序绝大部分时间不是在算什么,而是在干等——等请求穿过网络、等对方的模型生成完、等响应再穿回来。串行地跑,意味着这些"干等"的时间一段接一段地累加;而并发地跑,意味着这一条在等的时候,另外七条也在同时等——等待的时间被叠在了一起,总耗时于是被压缩了几倍。所以 LLM 批量调用是并发的绝佳场景。但并发数绝不是越大越好,这就是另一头——限流。任何一个 LLM API 都有限流:每分钟多少次请求、每分钟多少 token,是有明确上限的。你把并发开到一百,看似很猛,实际上大概率会瞬间撞破对方的限流阈值,于是对方开始大批大批地拒绝你的请求、返回 429 错误——你不但没更快,反而因为海量请求失败,变得更慢、更乱。所以 workers 这个并发数,要设成一个"能把你的吞吐顶到接近限流线、但不越过它"的值,它取决于你这个 API key 的具体配额,需要你去查、去测。这里还有一个容易被忽略的点:即时落盘和续跑那套设计,在并发下不能丢。你看 run_batch_concurrent 里,提交任务前依然先 load_done 过滤了一遍,每个任务回来依然是立刻 append 落盘——并发只是改变了"任务怎么被调度执行",它没有、也不该改变"每条结果都立刻落盘、重启都能跳过已完成"这个根基。一句话:用并发把等待时间叠起来换速度,但要把并发数死死压在 API 限流线之下。主干都齐了,最后是几个把批量推理真正用到生产里才会撞见的工程坑。

六、工程坑:失败重试、成本核算、结果校验、可观测

主干之外,还有几个工程坑,不处理就会让你的批量作业在边角上出问题坑 1:失败的条目要能单独重试,而不是整批重来。上一节那个 errors.jsonl,记下了所有失败的 id——它天然就是一份"待重试清单"。重试时,只把这些失败的挑出来再跑一遍:

def retry_errors(items, error_path="errors.jsonl", result_path="results.jsonl"):
    """重试:只把上一轮失败的条目挑出来再跑一遍,而不是整批重来。"""
    failed_ids = load_done(error_path)                  # errors 文件里记的就是失败的 id
    retry_items = [it for it in items if it["id"] in failed_ids]
    open(error_path, "w").close()                       # 清空旧错误档,这一轮重新记
    for item in retry_items:
        r = process_one(item)
        if r["ok"]:
            append_result(result_path, r["id"], r["output"])
        else:
            append_result(error_path, r["id"], r["error"])  # 还失败的,留待下一轮

坑 2:成本要实时核算,别等账单来了才知道。批量作业一跑就是几万次调用,每一次都在花钱。要在跑的过程中,按 token 用量实时累加估算花费,心里有数:

def estimate_cost(usage, price_in_per_1k, price_out_per_1k):
    """按 token 用量估算花费 —— 批量作业必须一边跑一边盯着成本。"""
    # usage 形如 {"prompt_tokens": 1200, "completion_tokens": 300}
    cost_in = usage["prompt_tokens"] / 1000 * price_in_per_1k     # 输入 token 的钱
    cost_out = usage["completion_tokens"] / 1000 * price_out_per_1k # 输出 token 的钱
    return cost_in + cost_out

坑 3:模型"答了"不等于"答对了格式",结果要校验。如果你要求模型输出 JSON,那解析失败、缺字段,都该算这一条失败——别把一条格式不对的输出当成功存进去:

def process_one_validated(item):
    """处理单条并校验输出 —— 模型"答了"不等于"答对了格式"。"""
    try:
        raw = call_llm(item["text"])
        data = json.loads(raw)                  # 要求输出 JSON:解析失败就算这条失败
        if "label" not in data:                 # 再校验必需字段在不在
            raise ValueError("模型输出缺少 label 字段")
        return {"id": item["id"], "ok": True, "output": data}
    except Exception as e:
        return {"id": item["id"], "ok": False, "error": repr(e)}

坑 4:作业跑完要打一份"账",成败一目了然。几万条跑完,你得立刻知道成了多少、败了多少、还差多少,而不是去肉眼数文件:

def report(total, result_path="results.jsonl", error_path="errors.jsonl"):
    """作业跑完打一份账:成了多少、败了多少、还差多少。"""
    ok = len(load_done(result_path))
    bad = len(load_done(error_path))
    print(f"总数 {total} | 成功 {ok} | 失败 {bad} | 未处理 {total - ok - bad}")
    if bad:
        print(f"有 {bad} 条失败,详见 {error_path},可用 retry_errors 单独重跑")

坑 5:重试要带退避,别对着限流硬撞。失败里很大一部分是限流(429)和超时——这类失败,立刻原样重试只会继续撞墙。重试前要等一会儿,而且每重试一次等得更久(指数退避),给对方的限流器留出恢复的时间。坑 6:输入数据的 id 必须稳定且唯一。整套续跑都建立在"用 id 判断这条做没做过"之上——如果 id 是按行号临时生成的,那下次输入顺序一变,id 全错位,续跑就全乱了。id 要来自数据本身、稳定不变(比如数据库主键、内容哈希)。坑 7:别把超大输入闷头塞给模型。几万条真实数据里,总有那么几条长得离谱——直接塞过去要么超出上下文窗口报错、要么烧掉惊人的 token。在调用之前就该按长度筛一道:过长的截断,或单独挑出来另作处理。坑 8:大批量作业,结果别只写一个文件。几十万条结果全堆进一个 jsonl,文件会大到难以处理、load_done 每次启动都要全文件扫一遍。量大时要按批次分文件(比如每一万条一个文件),或把"已完成 id"单独存成一份索引。

关键概念速查

概念 / 手段 说明
作业不是循环 几万次 LLM 调用是会中途失败的长作业,非一段计算
循环挨个调的错 赌全程不中断、不抛错、内存撑到底,长作业必输
即时落盘 每条结果立刻追加写盘,失败代价从"全部"降到"一条"
JSONL 追加格式 每行一个 JSON,可追加、坏一行不毁全文件
幂等续跑 重启读已完成 id,跳过做过的,效果等同一次跑完
结果文件即进度 已落盘的 id 就是最权威的进度,不另记计数器
单条失败隔离 异常包成结构化结果,一条失败不掀翻整个作业
errors 即重试清单 失败条目落进单独文件,天然是待重试清单
并发压在限流下 并发叠起等待时间提速,但并发数不得越过限流线
成本实时核算 按 token 用量边跑边估花费,别等账单才知道

避坑清单

  1. 把几万次 LLM 调用当"会中途失败的长作业",不是一段顺跑的循环。
  2. 每处理完一条,结果立刻追加落盘,绝不把结果攒在内存里等跑完。
  3. 结果用 JSONL 每行一条,可追加、坏一行不影响其余行。
  4. 重启时先读已完成的 id 集合,跳过做过的,只跑没跑过的。
  5. 用结果文件本身当进度依据,别另记一个脆弱的"跑到第几条"。
  6. 单条处理用 try except 包住,异常包成结构化结果,不抛给主循环。
  7. 失败条目连同错误原因写进单独的 errors 文件,作为待重试清单。
  8. 用并发提速,但并发数压在 API 限流阈值之下,别撞出大批 429。
  9. 重试要带指数退避,边跑边按 token 核算成本,跑完打一份成败账。
  10. 输入 id 要稳定唯一,超长输入先筛一道,结果量大时按批次分文件。

总结

回头看那串"跑到第 4000 条崩了结果全丢、限流终止整批没跑、不知道跑到哪、几条坏数据带崩全局"的问题,以及我后来在批量推理上接连踩的坑,最该记住的不是某一个函数的写法,而是我动手前那个想当然的判断——"批量跑一批 LLM 请求,就是写个 for 循环挨个调"。这句话错在它把"批量推理"和一段普通的内存循环,当成了同一种东西。我以为循环挨个调、攒进列表、跑完一存,这件事就办成了。可我忽略了一件最要紧的事:几万次大模型 API 调用,根本不是一段"短小、可靠、原子"的计算,而是一个要跑几小时、必然会中途失败、而且可以部分完成的长作业。一个朴素的 for 循环,骨子里赌的是"全程顺利跑完";而一个长作业,中途出岔子不是万一,是必然。这个错配,本地用几十条数据测时根本看不出来——几十次调用跑得快、撞上失败的概率又低,它恰好把那个"全程顺利"的赌注赢了下来;它只会在真正跑那几万条、跑上几小时的时候,以"崩在半路、成果归零"的方式爆出来。

所以做对批量推理,真正的功夫不在"把循环写得更快"那几行上。循环本身不难。真正的功夫,在于你要从一开始就承认"这是一个会中途失败的长作业",然后把那个朴素循环的每一个赌注,都换成"失败了也不要紧"的设计:你不能赌内存撑到最后,就每处理完一条立刻把结果落盘;你不能赌作业一次跑完,就让它能读着断点、跳过已完成的接着跑;你不能赌每一条都不出错,就把单条的失败用 try except 隔离起来、单独归档;你想用并发提速,就把并发数死死压在 API 限流线之下;而到了失败重试、成本核算、结果校验这些边角上,你还要处处守住,别让作业又在某个角落功亏一篑。这篇文章的几节,其实就是顺着这套规矩展开的:先想清楚"循环挨个调"为什么错,再讲即时落盘、幂等续跑、单条失败隔离、并发与限流,最后是重试、成本、校验、可观测这几个把批量作业守扎实的工程细节。

你会发现,批量推理这件事,和现实里"一个人怎么寄出一万封信"完全相通。一个不靠谱的人会怎么寄?他把一万封信一股脑全抱在怀里,挨家挨户地塞邮筒,塞一封从怀里抽一封。塞到第四千封时,他脚下一滑摔了一跤,怀里剩下的六千封连同记不清的那四千封,全撒了一地、混作一团——他根本说不清哪些已经寄出去了、哪些还没,只能把这一万封从头再寄一遍,前四千封的邮票钱白花了;更糟的是,中间有几封地址写得有问题,他在那几封上卡住、急得团团转,结果剩下九千多封好信也跟着耽搁了。而一个靠谱的人怎么做?他每寄出去一封,就立刻在一个本子上记下这封的编号(这就是即时落盘);万一中途摔了、第二天接着干,他先翻一遍本子,凡是记过的就直接跳过(这就是幂等续跑);遇到地址有问题的那几封,他不在那儿干耗,而是把它们单独挑进一个小袋子,贴上"待核对",先把其余九千多封好信寄完,回头再单独处理那一小袋(这就是单条失败隔离);他还找来几个帮手一起跑,但人数不会多到把小小的邮局柜台挤爆(这就是并发压在限流之下)。同样是寄一万封信,不靠谱的人赌自己一趟不会摔、每封都没问题,靠谱的人则一开始就认定"必然会摔、必然有几封有问题",于是每一步都为"摔了之后还能接着干"做好了准备——差别不在"寄信这件事本身难不难",只在寄信的人心里有没有"这是个会中途出岔子的长差事"这根弦

最后想说,批量推理做没做对,差距永远不会在"本地拿几十条数据测一测"时暴露——本地几十次调用,跑得飞快、又几乎不会撞上失败,你那个朴素的 for 循环恰好一次性顺顺利利跑完了,结果整整齐齐落进文件,你自然觉得"批量嘛,循环挨个调"一点问题都没有。它只在真实的、要跑几万条几小时、会撞限流会被中断的生产作业里才显形。那时候它会用最难堪的方式给你结账:做不好,你会因为进程崩在半路,眼睁睁看着几千条结果和那一笔 API 费用一起归零,会因为一次限流,让整个作业当场终止、后面一万多条根本没跑,会因为几条坏数据,让一个本该 99% 成功的作业一条结果都产不出来;而做了,你的作业每跑完一条就稳稳落袋一条,被中断多少次都能从断点接着跑,几条坏数据被干净利落地隔离进重试清单,几万条数据无论跑多久、断多少回,最终都精确无误地交付出来。所以别等"跑到一半崩了、成果归零"那一刻找上门,在你写下批量调用的第一个 for 时就该想清楚:这里每条结果落盘了吗、崩了能续跑吗、一条失败会不会带崩全局、并发压在限流下了吗、成本我盯着吗,这一道道关口,我是不是都替这个长作业守住了?这些问题有了答案,你交付的才不只是一套"本地几十条看着对"的代码,而是一个无论数据有多少、作业被打断多少回,每一条都不丢、不重、不漏的、让人放心的批量系统。

—— 别看了 · 2026
声明:本站所有文章,如无特殊说明或标注,均为本站原创发布。任何个人或组织,在未征得本站同意时,禁止复制、盗用、采集、发布本站内容到任何网站、书籍等各类媒体平台。如若本站内容侵犯了原著者的合法权益,可联系我们进行处理 邮箱1846861578@qq.com。
技术教程

字符编码与乱码完全指南:从一次"数据库里的中文全变成了问号"看懂 UTF-8、字节与编码声明

2026-5-22 15:50:16

技术教程

文件上传安全完全指南:从一次"一个大文件把服务器内存撑爆"看懂大小校验、类型验证与路径穿越

2026-5-22 16:06:02

0 条回复 A文章作者 M管理员
    暂无讨论,说说你的看法吧
个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索