2024 年我做一个批量处理功能:要给几万条数据,每一条都调一次大模型 API去做分析。第一版我做得很省事:既然要快,那就开一个线程池,放 100 个 worker,一起往外发请求。本地我拿几十条数据测了测——真快:几十条几秒钟就跑完了。我心里很踏实:"调大模型 API 要快嘛,就是多开几个线程,一起猛发,发得越猛越快。"可等它真正上线、去跑那几万条真实数据,一串问题冒了出来。第一种最先把我打懵:跑了没几分钟,日志里开始刷屏 429 Too Many Requests——我那 100 个线程发出去的请求,绝大部分被服务端直接打了回来。第二种:我心想"被打回就重试呗",于是加了"失败立即重试"——结果更糟了:那些被 429 打回的请求,立刻又一起涌上去,429 反而刷得更凶。第三种:偶尔有一批请求没被 429,但响应慢得离谱——后来发现是其中混了一个超大的请求,一口气把配额吃掉了一大块。第四种最让我哭笑不得:我以为是配额不够,咬牙升级到了更贵的套餐,钱花出去了,速度却几乎没变快。我盯着这一连串问题想了很久才彻底想明白,第一版错在一个根本的认知上:我以为"调 API 要快,就是多开线程一起猛发"。这句话把"发得猛"和"跑得快"当成了一回事。可它不是。大模型 API 的另一头,有一道由 RPM 和 TPM 组成的限流闸门;你发得再猛,超过闸门的部分,只会被 429 原样打回——发得猛,不等于跑得快,只等于被拒得多。真正调好大模型 API,核心不是"开足线程猛冲",而是理解服务端有限流、并在客户端主动把发送速率,平稳地控制在配额之内。这篇文章就把大模型 API 的并发与限流应对梳理一遍:为什么"猛发"是错的、RPM 和 TPM 这两道闸怎么看、怎么用信号量控制并发数、怎么用令牌桶控制速率、429 来了怎么用指数退避加抖动来重试,以及批处理 API、TPM 预估、优先级队列这些把高并发调用真正做对要避开的坑。
问题背景
先把那串问题的现象和我的误判讲清楚,后面所有的设计都是冲着纠正这个误判去的。
现象:开 100 个线程一起狂调大模型 API 之后,上线冒出一串问题:日志刷屏 429 Too Many Requests(请求被服务端限流打回);加了"失败立即重试"后429 更严重(被打回的请求一起涌回去,形成重试风暴);偶尔有批请求慢得离谱(一个超大请求吃光了 token 配额);升级套餐后速度几乎没变(根本问题不是配额,是没有客户端限流)。
我当时的错误认知:"调大模型 API 要快,就是多开线程一起猛发,发得越猛越快。"
真相:大模型 API 的服务端,给每个账号都装了一道限流闸门,它通常由两个维度组成:RPM(Requests Per Minute,每分钟最多多少个请求)和 TPM(Tokens Per Minute,每分钟最多处理多少 token)。你的请求一旦超过任意一个维度的限制,服务端就直接返回 429 把它打回,根本不处理。所以"发得猛"和"跑得快"是两回事:你发出去 100 个请求,服务端配额只够处理 20 个,剩下 80 个不是在排队,是被拒了。真正的快,是在客户端就把发送速率,精确地控制在配额线以内,让请求平稳、不浪费地流过去,而不是一拥而上、撞得头破血流再重试。
要把大模型 API 的高并发调用做对,需要几块认知:
- 为什么"多开线程猛发"是错的——服务端有限流,猛发只换来更多 429;
- 看懂限流——RPM 和 TPM 是两道独立的闸,响应头里能读到余量;
- 控制并发数——用信号量,限制"同时在飞"的请求数量;
- 控制速率——用令牌桶,让请求按配额匀速发出;
- 429 重试、批处理 API、TPM 预估这些工程坑怎么处理。
一、为什么"多开线程猛发"是错的
先把这件最根本的事钉死:你调的大模型 API,不是你独占的一台机器,而是模型厂商按账号、按套餐分配额度的一项共享服务。为了不让某个账号把资源吃垮、为了对所有用户公平,厂商必然会限流。这个限流,是客观存在的物理边界——它不会因为你多开了几个线程就变宽。你开 10 个线程也好、开 100 个线程也好,每分钟能真正被处理的请求数和 token 数,是一个固定的上限。多出来的线程,不会让这个上限变大,只会让"超出上限、被 429 打回"的请求变多。
下面这段代码,就是我那个"上线就刷屏 429"的第一版:
# 反面教材:开一大堆线程,不加任何节制地猛发
from concurrent.futures import ThreadPoolExecutor
from openai import OpenAI
client = OpenAI()
def call_model(text):
resp = client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": text}],
)
return resp.choices[0].message.content
def naive_batch(items):
# 破绽:100 个线程,对几万条数据同时无节制地发请求
with ThreadPoolExecutor(max_workers=100) as pool:
return list(pool.map(call_model, items))
# 破绽一:100 个线程的发送速率,远超账号的 RPM / TPM 配额,
# 大量请求会被服务端直接 429 打回。
# 破绽二:这里对 429 毫无处理,被打回的请求直接当成失败丢掉。
# 破绽三:就算补上重试,被打回的一批又会一起涌回去 —— 重试风暴。
这段代码在本地测几十条时表现完美,因为几十条数据、配额绰绰有余,100 个线程根本没机会撞到天花板。它的问题不在代码本身,而在一个被忽略的前提:它默认"我想发多快,就能发多快"。可大模型 API 这一头的速率,根本不由我的线程数决定,而由服务端的配额决定。于是那串问题就有了解释:刷屏 429,是因为100 个线程的瞬时速率,远远冲破了 RPM/TPM 配额;重试更糟,是因为把被拒的请求不加退避地立刻重发,等于火上浇油;升级套餐没用,是因为瓶颈从来不是配额总量,而是我没有任何机制去匹配这个配额。问题的根子清楚了:调好大模型 API 的工程量,全在"承认服务端有一道你必须尊重的闸门"之后——你不主动把速率压到闸门以内,就只能一直撞门。先从看懂这道闸门说起。
二、看懂限流:RPM 和 TPM 两道闸
大模型 API 的限流,通常不是一道闸,而是两道,你的请求必须同时通过它们:
- RPM(Requests Per Minute):每分钟能发起的请求个数上限。它不在乎你每个请求多大,只数个数。
- TPM(Tokens Per Minute):每分钟能处理的 token 总量上限。它把你所有请求的输入 token 加输出 token 累加起来算。
这两道闸是独立的:你可能RPM 还很宽裕,却因为发了几个超长请求,先把 TPM 撞爆了;也可能每个请求都很小,却因为请求数太多,先把 RPM 撞爆了。好在,这道闸门不是黑盒——主流大模型 API 会在每个响应的 HTTP 头里,告诉你当前的配额余量:
import httpx
# 直接发一个原始请求,把响应头里的限流信息读出来
def inspect_rate_limit(api_key):
resp = httpx.post(
"https://api.openai.com/v1/chat/completions",
headers={"Authorization": f"Bearer {api_key}"},
json={"model": "gpt-4o-mini",
"messages": [{"role": "user", "content": "hi"}]},
)
# 这几个响应头,就是服务端实时告诉你的"配额仪表盘"
for h in ["x-ratelimit-limit-requests", # RPM 总额度
"x-ratelimit-remaining-requests", # RPM 剩余
"x-ratelimit-limit-tokens", # TPM 总额度
"x-ratelimit-remaining-tokens", # TPM 剩余
"x-ratelimit-reset-requests"]: # 多久后配额重置
print(f"{h}: {resp.headers.get(h)}")
这里的认知要点是:限流不是一个你只能被动挨打的黑盒——服务端在每个响应头里都给了你一个"配额仪表盘"。你要做的,是把这个仪表盘读进来,让客户端的发送速率,跟着仪表盘走。而"让发送速率跟着配额走"这件事,要从两个角度同时控制:一个是同时在飞的请求数量(对应 RPM),一个是请求发出的速率。先说第一个。
三、用信号量控制并发数
第一个要控制的,是"同一时刻,有多少个请求正在飞"。第一版的问题就是 100 个线程毫无节制。控制并发数,最经典的工具是信号量(Semaphore):你可以把它想成一个停车场,只有固定数量的车位——一个请求要发出,得先抢到一个车位;发完了,把车位让出来。车位满了,后来的请求就乖乖等着,而不是硬挤进去:
import threading
from concurrent.futures import ThreadPoolExecutor
# 信号量:最多允许 8 个请求同时在飞
_concurrency = threading.Semaphore(8)
def call_with_limit(text):
"""每个请求发出前,必须先拿到一个并发名额。"""
with _concurrency: # 拿不到名额就在这里阻塞等待
resp = client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": text}],
timeout=30, # 必须设超时,别让一个卡住的请求长占名额
)
return resp.choices[0].message.content
# 离开 with,名额自动归还给下一个等待的请求
def batch_with_semaphore(items):
# 线程池可以开得大一点,真正的并发上限由信号量卡死
with ThreadPoolExecutor(max_workers=32) as pool:
return list(pool.map(call_with_limit, items))
注意这里的一个设计:线程池的 max_workers 可以开得比信号量大(比如 32 个线程、8 个并发名额)。这没问题——真正的并发上限,由信号量这道更窄的闸卡死;多出来的线程,只是在信号量门口排队等名额。这里的认知要点是:控制并发,不是靠"少开线程"这种粗糙的办法,而是靠一个独立的、明确的并发闸(信号量)——它把"并发上限"这个关键参数,从线程池里抽出来,变成一个你能精确调、能单独调的数字。但信号量只管住了"同时几个",它管不住"每秒几个":8 个请求如果个个都在几十毫秒内秒回,那一秒内你照样能发出上百个——RPM 还是会爆。所以还需要第二个角度的控制:速率。
四、用令牌桶控制请求速率
控制速率——也就是"每秒/每分钟最多发出多少个请求"——最经典的算法是令牌桶(Token Bucket)。它的模型很直观:有一个桶,系统按固定速率往桶里放令牌(比如每秒放 10 个);每发一个请求,先从桶里取走一个令牌;桶空了,就得等新令牌补进来。这样,请求的长期平均速率,就被牢牢锁在了"放令牌的速率"上,同时桶的容量,又允许短暂的突发:
import time
import threading
class TokenBucket:
"""令牌桶:把请求速率,匀速控制在 rate 个/秒以内。"""
def __init__(self, rate, capacity):
self.rate = rate # 每秒补充多少令牌
self.capacity = capacity # 桶容量,决定能容忍多大的突发
self.tokens = capacity # 初始装满
self.last = time.monotonic()
self.lock = threading.Lock()
def acquire(self, need=1):
"""取 need 个令牌;不够就阻塞等待,直到够。"""
while True:
with self.lock:
now = time.monotonic()
# 按流逝的时间,把这段时间该补的令牌补进来
elapsed = now - self.last
self.tokens = min(self.capacity,
self.tokens + elapsed * self.rate)
self.last = now
if self.tokens >= need:
self.tokens -= need
return
# 令牌不够,算出还差多少、要等多久
shortage = need - self.tokens
wait = shortage / self.rate
time.sleep(wait) # 注意:在锁外面等待
把令牌桶和上一节的信号量合在一起用,客户端就有了两道互补的闸:
# RPM 配额假设是 600,换算成每秒 10 个,留点余量取 8
_bucket = TokenBucket(rate=8, capacity=16)
_concurrency = threading.Semaphore(8)
def call_governed(text):
"""信号量管"同时几个",令牌桶管"每秒几个",双闸齐下。"""
_bucket.acquire(1) # 第一道闸:速率
with _concurrency: # 第二道闸:并发数
resp = client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": text}],
timeout=30,
)
return resp.choices[0].message.content
这里的认知要点是:并发数和速率,是两个不同的维度,要用两个不同的工具分别控制——信号量管住"瞬时同时在飞的数量",令牌桶管住"单位时间累计发出的数量"。两者配齐,你的客户端才能既不超 RPM、又不超并发,平稳地贴着配额线跑。可即便控制得再小心,429 还是可能偶发(配额估算有误差、和别的进程共享了配额)。所以最后一道防线不能少:优雅地处理 429。
五、429 来了:指数退避加抖动重试
当 429 真的发生时,最错误的反应,就是我第一版那样"立刻原样重试"。想象一下:一批请求同时被 429 打回,如果它们都立刻重试,就会在同一瞬间再次一起涌向服务端——这叫重试风暴,它会让限流雪上加霜。正确的做法是指数退避加抖动:每失败一次,等待时间翻倍(1 秒、2 秒、4 秒……),给服务端喘息的时间;同时给等待时间加一个随机抖动,把原本会挤在同一刻的重试,打散到一段时间里:
import random
import time
from openai import RateLimitError
def call_with_backoff(text, max_retries=5):
"""429 时:优先听 Retry-After,否则指数退避 + 抖动。"""
for attempt in range(max_retries):
try:
_bucket.acquire(1)
with _concurrency:
resp = client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": text}],
timeout=30,
)
return resp.choices[0].message.content
except RateLimitError as e:
if attempt == max_retries - 1:
raise # 重试到头了,如实抛错
# 优先尊重服务端响应头里的 Retry-After(它最权威)
retry_after = getattr(e, "retry_after", None)
if retry_after:
wait = float(retry_after)
else:
# 没有就用指数退避:1, 2, 4, 8 秒
wait = 2 ** attempt
# 关键:加随机抖动,把扎堆的重试打散开
wait += random.uniform(0, wait * 0.5)
time.sleep(wait)
这段代码有两个关键点。第一,优先听 Retry-After:服务端常常会在 429 响应里明确告诉你"过多少秒再来",这个值最权威,要优先用它。第二,抖动不可省:如果所有失败的请求都退避一模一样的时间,它们只是从"同时撞"变成了"过 N 秒后再一起同时撞"——加上随机抖动,才能真正把它们在时间上散开。下面这张图,把一次"受控调用 + 429 退避"的完整流程串起来:
六、工程坑:批处理、TPM 预估与优先级
五块设计之外,还有几个工程坑,不处理就会让高并发调用要么慢、要么贵、要么乱。坑 1:不赶时间的海量任务,用 Batch API,别用实时接口硬扛。我开头那个场景——几万条数据离线批量处理——其实根本不该用实时的 chat.completions 接口去一条条发。主流厂商都提供 Batch API:你把成千上万条请求打包成一个文件一次性提交,服务端在它空闲时(承诺 24 小时内)慢慢处理,完成后你再来取结果。它的价格通常只有实时接口的一半,而且完全不占你的实时 RPM/TPM 配额:
import json
# 把几万条请求,写成 Batch API 要求的 jsonl 文件
def build_batch_file(items, path="batch.jsonl"):
with open(path, "w", encoding="utf-8") as f:
for i, text in enumerate(items):
line = {
"custom_id": f"req-{i}", # 用来对回结果
"method": "POST",
"url": "/v1/chat/completions",
"body": {
"model": "gpt-4o-mini",
"messages": [{"role": "user", "content": text}],
},
}
f.write(json.dumps(line, ensure_ascii=False) + "\n")
return path
def submit_batch(path):
# 上传文件并创建批处理任务,之后异步轮询它的状态
f = client.files.create(file=open(path, "rb"), purpose="batch")
batch = client.batches.create(
input_file_id=f.id,
endpoint="/v1/chat/completions",
completion_window="24h",
)
return batch.id
坑 2:发请求前要预估 token,别让一个超大请求撞爆 TPM。这是开头第三个问题的真凶。TPM 是按 token 算的,一个塞了超长文本的请求,可能一个就吃掉几万 token,瞬间把这一分钟的配额掏空一大块。所以对长度不可控的输入,发之前要先估算 token 数,据此从令牌桶里取对应数量的令牌(而不是固定取 1 个),让令牌桶也能反映 TPM:
import tiktoken
_encoder = tiktoken.get_encoding("cl100k_base")
def estimate_tokens(text):
"""粗估一个请求的 token 数(输入加上预留的输出)。"""
input_tokens = len(_encoder.encode(text))
# 给模型的输出也预留一份额度,这里按经验留 500
return input_tokens + 500
def call_token_aware(text):
"""按预估的 token 数,从令牌桶里取等量令牌 —— 让桶反映 TPM。"""
need = estimate_tokens(text)
_bucket.acquire(need) # 大请求取得多、自然发得慢,小请求反之
with _concurrency:
resp = client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": text}],
timeout=60,
)
return resp.choices[0].message.content
坑 3:配额紧张时,要让重要的请求优先。当 RPM/TPM 是稀缺资源时,你得想清楚:用户正在前台等待的实时请求,和一个可以慢慢跑的后台批量任务,谁该先用配额?答案显然是前者。所以高并发系统里,常常要给请求分优先级,用一个优先级队列来调度:配额紧张时,先放行高优先级的请求,低优先级的任务往后排。坑 4:配额是账号级的,要算总账。RPM/TPM 配额是整个账号共享的。如果你有多个服务、多个进程都在用同一个 API Key,那它们是在抢同一份配额——你给每个进程单独配的令牌桶速率,加起来绝不能超过账号总配额,否则各自都觉得自己没超,合起来却爆了。坑 5:留好最终降级。哪怕这套限流做得再细,重试到头仍然失败的请求,在海量调用下依然会有。调用方必须接住这个异常,并想好兜底:是记一条日志、稍后进死信队列重跑,还是给用户一个"稍后再试"的友好提示?一个健壮的系统,不是"假设永不被限流",而是"被限流了也能优雅地扛住"。
关键概念速查
| 概念 / 手段 | 说明 |
|---|---|
| RPM | 每分钟请求数上限,只数请求个数 |
| TPM | 每分钟 token 总量上限,累加所有请求的输入输出 token |
| 429 | 请求超出限流配额时服务端的拒绝响应 |
| 限流响应头 | x-ratelimit-remaining 等头实时告知配额余量 |
| 信号量 | 控制同一时刻同时在飞的请求数量 |
| 令牌桶 | 按固定速率发放令牌,把请求速率锁在配额线内 |
| 指数退避 | 每次重试等待时间翻倍,给服务端喘息时间 |
| 抖动 | 给退避时间加随机量,把扎堆的重试打散开 |
| Batch API | 离线批量任务异步处理,价格减半且不占实时配额 |
| token 预估 | 发请求前估算 token 数,防止大请求撞爆 TPM |
避坑清单
- 多开线程不会让配额变大,只会让被 429 打回的请求变多。
- 限流有 RPM 和 TPM 两道独立的闸,必须同时通过。
- 响应头里的 x-ratelimit 字段是配额仪表盘,要读进来用。
- 用信号量控制同时在飞的请求数,并发上限独立可调。
- 用令牌桶控制发送速率,信号量管不了每秒几个。
- 429 别立即原样重试,要指数退避,否则形成重试风暴。
- 退避必须加随机抖动,否则只是推迟扎堆,没有打散。
- 429 响应里的 Retry-After 最权威,要优先采用。
- 不赶时间的海量任务用 Batch API,价格减半不占实时配额。
- 发请求前预估 token,大请求多取令牌,配额是账号级共享。
总结
回头看那串"刷屏 429、重试更糟、大请求拖垮、升级套餐没用"的问题,以及我后来在并发调用上接连踩的坑,最该记住的不是某一个限流参数,而是我动手前那个想当然的判断——"调 API 要快,就是多开线程一起猛发"。这句话错在它把大模型 API 当成了一台我可以独占、可以随意压榨的本地机器。我以为速度的瓶颈在我这边,我多开线程、多压并发,就能更快。可大模型 API 是模型厂商按账号分配额度的共享服务——它的另一头有一道客观存在的限流闸门。我那 100 个线程,没有撞开这道闸门,只是把自己撞得满头是包。发得猛,从来不等于跑得快;在一道有限流的闸门面前,猛冲只换来更多的 429。
所以做大模型 API 的高并发调用,真正的工程量不在"把线程池开多大"那一个数字上。那个数字,谁都会调。真正的工程量,在于你要承认"服务端有一道你必须尊重的闸门",并在客户端,主动把自己的发送行为,精确地约束到这道闸门以内:它限制并发,你就用信号量卡住"同时在飞几个";它限制速率,你就用令牌桶锁住"每秒发出几个";它按 token 算账,你就预估 token、按量取令牌;它偶尔还是会 429,你就用指数退避加抖动体面地重试;而对那些不赶时间的海量任务,你就干脆改用 Batch API,既便宜又不占配额。这篇文章的几节,其实就是顺着这条线展开的:先想清楚"猛发"为什么错,再讲怎么看懂 RPM/TPM、怎么用信号量管并发、怎么用令牌桶管速率、429 来了怎么优雅退避,最后是批处理、token 预估、优先级这几个把高并发调用做扎实的工程细节。
你会发现,调用有限流的大模型 API,和现实里"在一条限速的高速公路上开车赶路"完全相通。一个不懂路况的司机会怎么做?他觉得要快就得猛踩油门,把车开到 200 码(这就是开 100 个线程猛发)。结果呢?一路上全是测速摄像头,一张接一张的罚单(这就是刷屏的 429),被拦下后他还一脚油门冲出去,再被拍一张(这就是重试风暴)。他以为是车不够好,换了辆更贵的跑车(这就是升级套餐),可限速还是那个限速,他还是快不了。而一个真懂行的司机怎么做?他先看清这条路的限速牌(这就是读懂 RPM/TPM),然后把车速稳稳地贴着限速开(这就是令牌桶控速)——不快不慢、一张罚单都不吃,反而是全程最快到达的那个。他还知道,有些不急的大宗货物,根本不必自己开车运,挂个慢车物流就行(这就是 Batch API)。
最后想说,大模型 API 的并发调用做没做对,差距永远不会在"本地测几十条都飞快"时暴露——本地你的调用量小到根本够不着配额的天花板,你会觉得"开足线程猛发"已经是全部。它只在真实的、要处理几万几十万次调用、要长时间贴着配额线跑的线上环境里才显形。那时候它会用最直接的方式给你结账:做不好,你会像我一样,看着日志里 429 刷成瀑布,任务跑了一夜还卡在 10%,你加线程它更慢、你重试它更糟、你花钱升级它纹丝不动;而做对了,你的批量任务会以一种安静而高效的节奏推进:请求平稳地贴着配额线流出去,几乎看不到一个 429,偶尔的限流被退避悄悄吸收,海量的离线任务被 Batch API 用一半的钱在后台默默跑完。所以别等"瀑布般的 429 日志"找上门,在你写下那行 max_workers=100 的那一刻就该想清楚:我面对的不是一台能被我榨干的本地机器,而是一项有限流闸门的共享服务——它的并发、它的速率、它的 token 配额,这一道道闸,我是不是都在客户端替它守住了?这些问题有了答案,你拿到的才不只是一个"线程开得很满"的程序,而是一条真正平稳、高效、扛得住海量调用的可靠管线。
—— 别看了 · 2026