2024 年我做一个批量分析功能——产品攒下了几万条用户评论,要给每一条都调一次大模型,分析出它的情绪、提到的功能点、是不是一条有效反馈。第一版我做得很顺手:写一个循环,遍历每一条评论,循环体里调一次大模型的 API,把返回的分析结果存进数据库。本地我拿几十条评论测了一遍,循环顺畅地跑完,结果也对,我心里很笃定:这不就是个"循环里调 API"的活儿嘛,数据多,无非多跑一会儿,这批量功能稳了。可等我拿真实的几万条数据跑起来,一串问题冒了出来。第一种最先把我打懵:循环才跑了几百条,API 突然开始大面积报错——一个叫 429 Too Many Requests 的错误刷了屏,整个批量任务当场中断,几万条数据才处理了个零头。第二种最难缠:我想,既然串行太慢,那就并发——我把循环改成一次性发起几百个并发请求。速度一开始确实窜上去了,可没撑多久,报错来得比串行时还密、还凶,而且这次除了 429,还多了一大批"请求超时"。第三种最头疼:我给每个请求加了"失败就重试",想着重试总能兜住。结果某次任务,跑得比不加重试还慢——我盯着日志才反应过来,重试的请求和新请求挤在一起,把本就拥堵的通道堵得更死了。第四种最莫名其妙:同样的几万条数据、同样的代码,我跑了两次,一次二十多分钟跑完,另一次跑了将近两小时,中间我什么都没改,快慢毫无规律。我盯着这一连串问题想了很久,才彻底想明白:第一版错在一个根本的认知上。我以为调用大模型的 API,和调用一个公司内部的普通接口没什么两样——它就是个函数,我传参数进去、拿结果出来;想让批量任务快,无非就是多开点并发把循环跑快,某个请求失败了重试一下,本质上是个"怎么把循环跑得更快"的问题。可这个认知是错的。大模型 API 不是一个"你想调多快就能调多快"的接口。它有明确而严格的速率限制:服务商会限制你每分钟最多发起多少个请求、每分钟最多消耗多少 token,一旦超过,它不会排队等你,而是直接拒绝(就是那个 429)。它的单次响应也很慢——一次调用动辄几秒、十几秒,而且这个耗时还很不稳定。在这样一个"有速率上限、又慢又不稳"的接口上,"多开并发"非但不会更快,反而会让你更快地撞上限流的墙;"猛重试"非但兜不住,反而会火上浇油。所以高并发地调用 LLM,根上不是"把循环跑快"这一个动作,而是一整套工程:要先搞清楚自己的速率上限究竟在哪、要把发出去的并发牢牢控制在这个上限之内、要用带退避的重试而不是莽撞的猛重试、要给慢请求设超时不让它拖垮全局、要让所有请求在一个有节奏的队列里平稳地流出去。本文从头梳理:为什么"多开并发"反而更慢更容易崩,服务商的限流到底在限你什么,并发该怎么控制,重试该怎么做,慢请求怎么办,以及一些把 LLM 高并发调用做扎实要避开的工程坑。
问题背景
先把大模型 API 的两个关键特性说清楚。第一,它有速率限制(rate limit):服务商按你的账号等级,给你设定了每分钟请求数(RPM,Requests Per Minute)和每分钟 token 数(TPM,Tokens Per Minute)的上限,你的调用速度一旦顶到这两个上限的任意一个,后续请求就会被拒绝,返回 HTTP 429。第二,它单次调用的延迟很高且不稳定:模型要逐个 token 地生成回答,一次调用耗时几秒到几十秒是常态,且同样的请求,这次和下次的耗时可能差很多。
错误认知是:大模型 API 是个普通接口,多开并发就能更快,失败重试就能兜住。真相是:它是一个有硬性速率上限、且单次很慢的接口,并发必须被精确地控制在上限之内,重试必须克制,否则就是自己把自己挤垮。把这一点摊开,第一版的几类问题就都能解释了:
- 串行循环也撞 429:你以为串行很慢、不可能超速,但模型生成快的时候,串行的请求频率照样可能顶破 RPM。
- 并发开大反而更崩:并发数超过速率上限后,多出来的请求不会被处理,只会变成一堆 429,和因为排队太久而起的超时。
- 猛重试越重试越慢:失败的请求立刻重试,等于在已经拥堵的通道里又塞进更多请求,拥堵进一步加剧。
- 同样的任务时快时慢:任务耗时取决于"你撞了多少次限流、重试了多少次",而这高度依赖运行时的并发时序,所以毫无规律。
所以让批量调用 LLM 又快又稳,核心不是"把循环跑快",而是一整套调用工程:摸清速率上限、把并发控制在上限内、用退避重试、给慢请求设超时。下面六节,就从第一版"多开并发就能更快"的想当然讲起。
一、为什么"多开并发"反而更慢更容易崩
第一版的批量调用,就是一个最朴素的串行循环。
# 反面教材一:串行循环,一条评论调一次
def analyze_all_v1(comments):
results = []
for comment in comments:
# 一条评论调一次大模型,等它返回,再处理下一条
result = call_llm(build_prompt(comment))
results.append(result)
return results
# 我以为串行很慢、但至少很安全 —— 一次只有一个请求在飞,
# 怎么可能超速?可如果模型响应快(比如每次 1 秒),
# 这个循环一分钟能发 60 个请求 —— 一样可能顶破 RPM 上限。
串行太慢,我于是把它改成并发,而且是最暴力的那种并发——一次性把所有请求全发出去。
# 反面教材二:一股脑发起几百个并发,以为越并发越快
import asyncio
async def analyze_all_v2(comments):
# 给每一条评论都创建一个协程,一次性全部发出去
tasks = [call_llm_async(build_prompt(c)) for c in comments]
# 几万条评论 = 几万个请求,在同一瞬间涌向 API
return await asyncio.gather(*tasks)
# 我以为:并发越高越快。
# 实际上:并发数一旦超过服务商给我的速率上限,
# 多出来的请求不会被处理,只会变成 429 和超时 ——
# 我不是在"调得更快",我是在"更快地撞墙"。
这里要看清一个反直觉的事实:并发数和"有效吞吐"之间,不是一直成正比的。在速率上限以内,你加并发,吞吐确实涨;可一旦并发数越过了上限,吞吐就再也不涨了——多出来的那些并发请求,不会被处理成有效结果,它们只会变成 429 错误。更糟的是,这些被拒的请求并不是"安静地失败",它们会触发重试、会和正常请求挤占网络和连接资源,把本来还顺畅的那部分也拖慢。所以"并发数 500"和"并发数 50(刚好是上限)"相比,有效吞吐一模一样,但前者额外背上了 450 次失败、450 次重试和一身拥堵。
这一节要建立的认知是:调用大模型 API 的吞吐量,根本不由"你开了多少并发"决定,而由"服务商给你的速率上限"决定——并发数只是你手里拧动的旋钮,而上限是一堵墙,旋钮拧过了墙,撞上去的是你自己。第一版"多开并发就更快"的想法,背后藏着一个隐含假设:吞吐量是我这边能决定的,我这边发得越多,那边就处理得越多。但这个假设对大模型 API 不成立。大模型 API 的处理能力,是服务商那边设定好的一个固定值(就是 RPM 和 TPM),它不会因为你猛发就变大。你能控制的,只是"以多快的速度把请求送到那堵墙前面"——送得比墙能吸收的慢,一切顺畅;送得比墙快,多出来的请求就堆在墙根下,变成 429。所以优化批量调用,第一件事不是"怎么把并发开到最大",而是彻底掉转思路:先去搞清楚那堵墙在哪,然后让你的发送速度,精确地、稳稳地贴着墙,既不越过、也不浪费。摸清这堵墙,就是下一节的事。
二、看懂限流:RPM 与 TPM,服务商在限你什么
要把并发控制在上限内,你得先知道上限是多少、以及它到底怎么算。服务商通常用两个指标限你:RPM(每分钟请求数)和 TPM(每分钟 token 数)。这两个指标,任意一个被顶破,都会触发 429。
# 服务商的两道限流闸:RPM 和 TPM,任意一个超了都会被拒
# 假设你的账号额度:
RPM_LIMIT = 500 # 每分钟最多 500 个请求
TPM_LIMIT = 200000 # 每分钟最多 20 万 token
# 关键:这两道闸是"且"的关系 —— 哪个先到,哪个就先拦你。
def which_limit_bites_first(avg_tokens_per_req):
# 估算:RPM 和 TPM,哪个会先成为你的瓶颈
reqs_allowed_by_tpm = TPM_LIMIT / avg_tokens_per_req
if reqs_allowed_by_tpm < RPM_LIMIT:
return "TPM 先到瓶颈"
return "RPM 先到瓶颈"
# 场景 A:每个请求很小(平均 200 token)
# 200000 / 200 = 1000,比 RPM 的 500 大 —— RPM 先成为瓶颈
# 场景 B:每个请求很大(平均 2000 token,长上下文 RAG 常见)
# 200000 / 2000 = 100,远小于 RPM 的 500 —— TPM 先成为瓶颈
# 这时你连 RPM 的五分之一都没用到,就已经被限流了
这件事很多人会忽略:大家盯着 RPM,以为"我每分钟没发那么多请求就安全",可如果你的请求都很大(长上下文的 RAG 应用尤其如此),TPM 会远早于 RPM 触顶。光数请求个数是不够的,你得同时盯着 token 的消耗速度。而当你真的撞上 429 时,响应里其实藏着关键信息——别只看到"被拒了"就完事。
# 429 响应里藏着关键信息:别只看到"被拒了",要读它的响应头
def handle_429(response):
# Retry-After:服务商明确告诉你"过多少秒再来"
retry_after = response.headers.get("Retry-After")
# 这些头(不同服务商名字略有差异)告诉你额度还剩多少
remaining_req = response.headers.get("x-ratelimit-remaining-requests")
remaining_tok = response.headers.get("x-ratelimit-remaining-tokens")
# 正确做法:严格按 Retry-After 等待,而不是自己瞎猜一个时间
return float(retry_after) if retry_after else None
# 第一版对 429 的处理是"立刻重试",完全无视了 Retry-After ——
# 服务商已经把"你该等多久"的答案写在响应头里了,我却没去读。
这一节的认知是:限流不是服务商在"为难你",它是服务商主动交给你的一份契约——一份关于"你现在还能调多快"的、实时更新的契约;RPM 和 TPM 是契约的固定条款,429 和 Retry-After 是契约的实时播报。第一版的错,是把这份契约当成了噪音:它把 429 当成一个"碍事的报错",当成一个需要"绕过去"的障碍,于是它的反应是"再试一次、再绕一次"。但 429 不是障碍,它是信息——它在清清楚楚地告诉你:你现在的速度超了,而且(通过 Retry-After)告诉你超了多少、该歇多久。一旦你把限流从"敌人"重新理解成"服务商发给你的调度指令",你的整个做法都会变:你不会再去"对抗"它,而会去"读懂"它、"配合"它。你会先根据自己请求的平均大小,算清楚到底是 RPM 还是 TPM 会先卡住你;你会在每个 429 响应里读出 Retry-After,精确地知道该等多久。把限流当契约读懂了,你才有资格去谈下一步——怎么让自己的发送速度,主动地贴合这份契约。
三、并发控制:用一个有上限的闸门,而不是无脑开
知道了上限,就要把并发牢牢地控制在上限之内。做法是:不要一次性把所有请求都发出去,而是设一个"并发闸门"——同一时刻,最多只允许固定数量的请求在飞,一个完成了,才放下一个进来。在 asyncio 里,这个闸门就是信号量 Semaphore。
# 并发闸门:用 Semaphore 把"同时在飞的请求数"卡在一个上限内
import asyncio
async def analyze_all_v3(comments, max_concurrency=50):
# 信号量 = 一个有 50 个名额的闸门
sem = asyncio.Semaphore(max_concurrency)
async def one(comment):
async with sem: # 抢一个名额,抢不到就在这里等
return await call_llm_async(build_prompt(comment))
# 离开 async with,名额自动归还,下一个请求才能进来
# 仍然给每条评论建一个协程,但它们会被闸门卡住、排队进入
tasks = [one(c) for c in comments]
return await asyncio.gather(*tasks)
# 和反面教材二的区别:那里几万个请求同时涌出;
# 这里任意时刻最多 50 个在飞,其余的乖乖排队 —— 平稳可控。
但只有"并发闸门"还不够。闸门控制的是"同时在飞多少个",可它没有直接控制"每秒发出去多少个"——如果模型响应很快,50 个并发的请求飞快地一轮轮完成,每秒发出的请求数照样可能顶破 RPM。所以还要再加一道"令牌桶"限速器,直接按 RPM 换算出的速率来放行请求。
# 比固定并发更稳:用令牌桶给请求"按速率"放行
import asyncio, time
class RateLimiter:
def __init__(self, rate_per_sec):
self.rate = rate_per_sec # 每秒允许放行多少个请求
self.allowance = rate_per_sec
self.last = time.monotonic()
async def acquire(self):
while True:
now = time.monotonic()
# 按流逝的时间,往桶里补充令牌
self.allowance += (now - self.last) * self.rate
self.last = now
if self.allowance > self.rate:
self.allowance = self.rate # 桶有上限,不会无限攒
if self.allowance >= 1:
self.allowance -= 1 # 取一个令牌,放行
return
# 没令牌了,等到攒够一个再说
await asyncio.sleep((1 - self.allowance) / self.rate)
# RPM 上限 500,换算成每秒就是 500/60,约等于 8.3。
# 闸门控制"同时在飞多少",令牌桶控制"每秒放出去多少",
# 两者配合,才能既不超 RPM、又把并发利用得足够满。
这一节的认知是:并发控制的目标,从来不是"尽量多",而是"尽量稳地贴着上限"——你要的不是一个能开到最大的旋钮,而是一个能把流量精确地稳定在某个值上的阀门。第一版的并发是一个旋钮——你拧到哪儿是哪儿,拧到底就是几万个请求齐发。而正确的并发控制,是一个阀门:它的作用是把流量"夹住",让它无论上游有多少请求等着、模型响应忽快忽慢,实际流出去的速度都稳稳地维持在你设定的那个值。这里需要两个互补的机制,因为"速率"这件事有两个维度:一个是"同一时刻有多少个在飞"(并发数,由 Semaphore 闸门控制),一个是"单位时间放出去多少个"(频率,由令牌桶控制)。只控制并发数不行——并发低但每个都秒回,频率照样超;只控制频率也不踏实——万一某些请求特别慢,在飞的会越积越多。两道闸一起上,你才真正握住了那个阀门。握住了阀门,你就能让发送速度精确地贴着上一节摸清的那堵墙——这才是高并发调用的稳定状态。
四、重试:退避、抖动,以及为什么不能猛重试
并发控制好了,429 会大幅减少,但不会绝对消失——服务商额度的瞬时波动、你对平均 token 的估算误差,都会让偶尔几个请求漏过去撞上限流。所以还需要重试。但第一版第三种问题——猛重试越重试越慢——是一个明确的警告:重试必须克制。正确的重试,是带指数退避和随机抖动的,而且要尊重服务商给的 Retry-After。
# 重试:指数退避 + 抖动,并且尊重服务商给的 Retry-After
import asyncio, random
async def call_with_retry(comment, max_retries=5):
for attempt in range(max_retries):
try:
return await call_llm_async(build_prompt(comment))
except RateLimitError as e: # 收到 429
if attempt == max_retries - 1:
raise # 重试次数用尽,放弃这一条
# 优先用服务商明确给出的 Retry-After
wait = e.retry_after
if wait is None:
# 没给,就指数退避:1s, 2s, 4s, 8s...
wait = 2 ** attempt
# 叠加随机抖动:避免一批同时失败的请求又同时重试
wait += random.random()
await asyncio.sleep(wait)
except (TimeoutError, ConnectionError):
await asyncio.sleep(2 ** attempt + random.random())
# 第一版"立刻重试"的恶性循环:一批请求同时撞 429、
# 又同时立刻重试,等于把同一个洪峰一次次原样重放 ——
# 退避把重试推迟、抖动把它们打散,洪峰才会被真正摊平。
但重试单个请求,只是治标。如果你发现 429 不是偶发、而是持续地、密集地出现,那说明的不是"某几个请求运气差",而是"你整个管道的发送速度,系统性地超了"。这时该做的,不是更努力地重试,而是主动把整个管道调慢——这叫自适应并发。
# 比单纯重试更进一步:频繁撞 429 时,主动调慢整个管道
class AdaptiveConcurrency:
def __init__(self, initial=50, floor=5, ceiling=200):
self.limit = initial
self.floor, self.ceiling = floor, ceiling
def on_rate_limited(self):
# 撞到 429:把并发上限砍半(乘性减少,迅速退让)
self.limit = max(self.floor, self.limit // 2)
def on_success_streak(self):
# 连续成功一段时间:小幅把上限加回去(加性增加,谨慎试探)
self.limit = min(self.ceiling, self.limit + 5)
# 重试是"针对单个请求"的补救;
# 自适应并发是"针对整个管道"的调节 ——
# 撞墙了就大步退,顺畅了就小步进,系统自己收敛到稳定点。
这一节的认知是:重试其实有两种,一种是"修复一个失败的请求",一种是"修正整个系统的发送速率"——第一版只做了前者,而密集的 429 真正在喊的,是后者。第一版把所有的 429 都当成"单个请求的偶然失败"来处理,于是它的全部应对就是"把这个请求再发一次"。这对真正偶发的失败是对的。但当 429 变得密集,它的含义就变了:它不再是某个请求的运气问题,而是一个系统级的信号——你的管道整体太快了。对这个系统级信号,你却用"重试单个请求"去回应,就完全文不对题:你把每个超速产生的失败都重试一遍,无非是把超速的流量原样又发了一遍,你不是在修正超速,你是在维持超速。正确的回应,是让系统层面的问题在系统层面解决:撞墙密集,就果断把整体并发砍下来(乘性减少,because超速是危险的、要快速脱离);平稳一段时间了,再小步把并发加回去试探(加性增加,because试探要谨慎)。这套"快退慢进"的自适应,会让你的管道自动收敛到那个"刚好不撞墙"的速度上。把"重试"和"调速"分清楚——前者管单个请求,后者管整个管道——你才算真正接住了 429 这个信号。
五、超时与降级:慢请求不能拖垮整个任务
前面几节解决了"发得太快"。但第一版第四种问题——任务时快时慢、毫无规律——还没解决。这背后,是大模型单次调用延迟的高度不稳定:大多数请求几秒返回,但总有少数会拖到几十秒,甚至卡住迟迟不返回。一个批量任务,如果被这些"长尾慢请求"卡住,整体耗时就会失控。第一道对策,是给每个请求设一个硬超时。
# 给每个请求设硬超时:慢请求不能无限期地拖着
async def call_with_timeout(comment, timeout=30):
try:
# 30 秒还没返回,就主动放弃这次调用
return await asyncio.wait_for(
call_llm_async(build_prompt(comment)), timeout=timeout)
except asyncio.TimeoutError:
# 超时的请求,当成一次失败去重试,而不是干等下去
raise CallTimeout(comment)
# 没有超时,一个卡死的请求会一直占着并发闸门的一个名额,
# 相当于你的有效并发被悄悄"漏"掉了一格、两格…… 越漏越少,
# 整个任务就这样不知不觉地越跑越慢 —— 这正是"时快时慢"的真相。
第二道对策,是接受"批量任务里一定会有失败",别让个别条目的失败拖垮全局。第一版用 asyncio.gather 默认行为,任何一个协程抛异常,整个 gather 就崩了——几万条数据,绝不能因为某一条就全盘中断。
# 批量任务:允许部分失败,把失败的单独收集,绝不中断全局
async def analyze_all_v4(comments, max_concurrency=50):
sem = asyncio.Semaphore(max_concurrency)
ok, failed = [], []
async def one(comment):
async with sem:
try:
r = await call_with_retry(comment)
ok.append((comment, r))
except Exception:
# 这一条彻底失败:记下来,但不向上抛、不中断别的
failed.append(comment)
await asyncio.gather(*[one(c) for c in comments])
# 失败的条目集中起来,任务结束后再单独跑一轮,或转人工
return ok, failed
# 第一版里任何一条出错,都会让 gather 整体抛异常、任务中断。
# 几万条数据,绝不能因为某一条的失败就让前面的成果全白费。
这一节的认知是:批量调用 LLM,你要从"追求每一条都成功"的心态,转向"接受一定比例的失败、并让失败可控、可隔离、可收集"的心态——因为大模型调用的慢和不稳,是你无法消除的常态,你能做的不是消灭它,而是不让它扩散。第一版的代码里藏着一个完美主义的预设:每一条都应该成功,所以任何一条失败都是"异常",都该让程序停下来报警。这个预设对一个调用稳定内部接口的循环也许成立,但对大模型批量调用是致命的——当你有几万次调用,而每一次都有几秒到几十秒的不稳定延迟、都有撞上 429 的可能,那么"全部成功、无一例外"根本不是一个现实的目标。一旦你还抱着这个目标,你的程序就会非常脆弱:它会被第一个长尾慢请求拖住,会被第一个最终失败的条目整体打断。转换心态之后,你的设计目标就变了——不再是"零失败",而是"失败被关在小盒子里":用超时,把"慢"这个问题关在单个请求里,不让它漏占并发名额;用部分失败收集,把"失败"这个问题关在单个条目里,不让它中断整个任务。慢和失败照样会发生,但它们被隔离了、被记录了、可以事后单独处理。能扛住真实规模的批量任务,拼的不是"不出错",而是"出错了也不扩散"。
把一条评论从进入批量任务到产出结果的完整处理流程画出来,就是下面这张图:
[mermaid]
flowchart TD
A[一条评论进入任务] --> B[在并发闸门前排队]
B --> C[拿到名额 发起调用]
C --> D{这次调用结果如何}
D -->|成功| E[结果存入成功列表]
D -->|超时或 429| F{重试次数还够吗}
F -->|还够| G[按退避加抖动等待后重试]
G --> C
F -->|用尽| H[记入失败列表 不中断全局]
六、把 LLM 高并发调用做扎实,要避开的工程坑
前面五节讲清了高并发调用的几条主轴。但要在生产里真正用好,还有几个工程坑得专门讲。第一个,也是最容易被忽略的:高并发会让烧钱的速度也"高并发"。并发调用让花钱的速度成倍上升,一个失控的批量任务,几分钟就能烧掉一笔计划外的开销,所以要给整个任务设一个 token 预算上限。
# 坑一:高并发也会让烧钱高并发 —— 给批量任务设个总预算闸
class BudgetGuard:
def __init__(self, max_total_tokens):
self.max = max_total_tokens
self.used = 0
def check_and_add(self, tokens):
if self.used + tokens > self.max:
# 整个任务的 token 预算用完了,主动停下来
raise BudgetExceeded(f"已用 {self.used},超出预算 {self.max}")
self.used += tokens
# 高并发跑批,出问题时烧钱的速度也是高并发的 ——
# 一个陷入疯狂重试的任务,能在很短时间里烧掉一大笔。
# 预算闸是最后一道"别让账单失控"的保险。
第二个坑,是长任务的进度没法保存。几万条数据跑好几个小时,中途万一中断(崩溃、被限流卡死、手动停掉),必须能从断点接着跑,而不是从头再来。
# 坑二:长批量任务要支持断点续跑,别一中断就从头再来
def load_done_ids(checkpoint_file):
# 已成功处理的条目 id,每处理成功一条就追加记一条
try:
with open(checkpoint_file) as f:
return set(line.strip() for line in f)
except FileNotFoundError:
return set()
def filter_pending(comments, checkpoint_file):
done = load_done_ids(checkpoint_file)
# 只处理还没完成的 —— 任务中断后重跑,自动跳过已完成的
return [c for c in comments if c.id not in done]
# 没有断点续跑,一个跑了一小时的任务在第 59 分钟崩了,
# 你就得从第一条重新开始 —— 时间和钱,都白烧了。
还有几个坑值得点一下。其一,不同服务商、甚至同一服务商的不同模型,RPM 和 TPM 额度都不一样,并发和限速参数千万不能写死在代码里,要做成可配置的。其二,流式输出(SSE)在高并发批处理里要谨慎——它会让每个连接占用更久,反而压低你实际能跑的并发,批处理场景通常用非流式更合适。其三,本地开发用的低额度账号和生产用的高额度账号,限流参数差很多,一定要在接近生产的额度下做压测,别拿本地的体感去估生产。下面把高并发调用 LLM 的几条主轴集中对照一下:
LLM 高并发调用的几条主轴对照
手段 解决什么问题 核心要点
--------------------------------------------------------------
并发闸门 同时在飞的请求过多 信号量卡住并发数
令牌桶限速 每秒发出的请求过快 按 RPM 换算的速率放行
退避重试 偶发的 429 与超时 指数退避加抖动 尊重 Retry-After
自适应并发 持续密集地撞限流 撞墙乘性减 顺畅加性增
请求超时 长尾慢请求拖垮全局 每请求设硬超时 超时即重试
断点续跑 长任务中途崩溃 记录已完成 重跑时跳过
原则:并发不是越高越好,而是越稳越好 ——
贴着速率上限平稳地跑,比忽快忽崩地猛冲快得多。
这一节这几个坑,串起来是同一个意思:高并发调用 LLM,你管理的根本不是"一次 API 调用",而是"一个会持续好几个小时、会消耗大量金钱、随时可能被限流和中断的长任务"。第一版把它当成"一个循环",于是循环之外的一切都不在视野里:循环跑多久、跑这一趟要花多少钱、跑到一半崩了怎么办——这些问题,第一版一个都没问。但它们恰恰是一个真实批量任务的核心问题。一次 API 调用是"瞬时"的,你管好它的成功与失败就够了;而一个长任务是"持续"的,它有时间维度(要几个小时,所以要能断点续跑)、有成本维度(累积消耗巨大,所以要有预算闸)、有环境维度(不同模型不同账号额度不同,所以参数要可配)。把高并发调用从"一个循环"重新理解成"一个需要被运维的长任务",你才会去给它配上预算、断点、可配置参数这些"任务级"的设施。这些设施没有一个是为了让单次调用更快,它们全都是为了让这个长达数小时的任务,能够安全地、可控地、不留烂摊子地跑完。
关键概念速查
| 概念 | 说明 |
|---|---|
| 速率限制 | 服务商对调用速度的硬性约束,超过即拒绝请求 |
| RPM | 每分钟请求数上限 |
| TPM | 每分钟 token 数上限,长上下文场景常先于 RPM 触顶 |
| HTTP 429 | Too Many Requests,顶破限流时服务商返回的错误 |
| Retry-After | 429 响应头,明确告知应等待多少秒后再重试 |
| 并发闸门 | 用信号量限制同一时刻在飞的请求数量 |
| 令牌桶 | 按固定速率放行请求,把发送速度卡在 RPM 之内 |
| 指数退避 | 重试等待时间逐次翻倍,避免重试形成新的洪峰 |
| 自适应并发 | 撞限流就降并发、顺畅就升并发,自动收敛到稳定点 |
| 断点续跑 | 记录已完成条目,任务中断后重跑可跳过它们 |
避坑清单
- 不要以为多开并发就更快:并发超过速率上限只会更崩。
- 不要以为串行就一定安全:模型响应快时,串行也能顶破 RPM。
- 不要无视 429 的响应头:Retry-After 已经告诉你该等多久。
- 不要只盯 RPM:长上下文场景往往 TPM 先成为瓶颈。
- 不要失败就立刻重试:用指数退避加抖动,否则重试变洪峰。
- 不要只会重试单个请求:频繁撞 429 要调慢整个管道。
- 不要不给请求设超时:一个卡死的请求会悄悄漏掉并发名额。
- 不要让一条失败中断全局:批量任务要允许部分失败、收集失败。
- 不要把并发参数写死:不同模型不同账号的额度差很多。
- 不要不做断点续跑:长任务中途崩了不能从头再来。
总结
回头看第一版那个"循环里调 API"的批量功能,它的错误很典型。它不在某一行代码,而在一个对大模型 API 的根本误解:以为它是个普通接口,多开并发就更快、失败重试就兜得住。真相是,大模型 API 是一个有硬性速率上限(RPM 和 TPM)、单次调用又慢又不稳的接口。在这样的接口上,并发开过了上限只会更快地撞墙,猛重试只会火上浇油。它的吞吐量,从来不由你开多少并发决定,而由服务商给你的速率上限决定。
而把高并发调用做对,工程量并不小。它不是"把循环改成并发"那么简单,而是要先摸清 RPM 和 TPM 的上限、要用并发闸门和令牌桶把流量精确地稳在上限内、要用带退避和抖动的重试、要在持续撞限流时自适应地调慢管道、要给每个请求设超时、要让批量任务允许部分失败并能断点续跑。一套真正又快又稳的批量调用,是这些环节一个不少地拼起来的。
这件事其实很像一大批车要通过一个收费站。第一版的想法是"车越多、开得越猛,过得越快"。可收费站的窗口数量是固定的,这就是速率上限——你把几百辆车一股脑全堵到收费站口,窗口还是只能一辆一辆地放,多出来的车不会更快通过,只会在入口挤成一团,谁也动不了,这就是并发开过头反而更崩。聪明的做法,是在收费站前面设一个匝道信号灯,让车按窗口能处理的节奏、一辆一辆有间隔地汇入,这就是限速和并发闸门。万一前方真堵上了,也不是让后面的车一个劲猛按喇叭往前挤,这就是猛重试;而是都先稳住、错开时间再走,这就是退避重试。还有那些半路抛锚的车,要赶紧拖到应急车道去,别让它堵死一整条道,这就是给慢请求设超时。车流要快,靠的从来不是"每辆车都把油门踩到底",而是"整个车流以收费站能承受的节奏,平稳地流动"。
这类问题还有一个共同的麻烦:它在开发和测试时几乎暴露不出来。你自己测,拿几十条数据跑一遍,请求量小得可怜,连速率上限的边都摸不到,串行也好、并发也好,都跑得又快又顺,你会觉得"批量调用 API 不就是个循环嘛"。真正会把问题撑爆的,是上线后的真实规模:几万、几十万条数据,会瞬间把你的调用频率推到 RPM 和 TPM 的上限上,你那个"一股脑发并发"的写法会被真实数据量乘出一场 429 风暴,而长尾的慢请求、中途的任务崩溃,也只有在跑满好几个小时的大任务里才会现形。所以如果你正在写一个需要大批量调用大模型的功能,别等任务在生产里崩了、账单也吓人了,才回头怀疑你的调用方式。在写下第一个循环的时候就想清楚:我的速率上限是多少、我的并发卡在哪、撞了限流我怎么退、慢请求我怎么超时、任务崩了我能不能续跑——把"把功能跑起来"和"让它在真实规模下又快又稳地跑"当成两件必须分别去做的事,这是这篇文章最想留给你的一句话。
—— 别看了 · 2026