会员中心一个跑了 11 个月没出过事的凌晨定时任务,周一加了一段"顺便清理过期 token"的代码,周二 3:14 任务挂了,sentry 抛了一句熟得不能再熟的话:RuntimeError: dictionary changed size during iteration。我们排查了 6 个小时,最后发现根因不在新代码,而在一年前那段循环写法本身就是定时炸弹,只是过去一直没人在循环里同时往字典里塞新 key。这篇文章把 4 种修法、性能基准、决策树、踩过的所有坑都摊开,顺手给字典遍历这件事立了规矩。
故障现场
会员中心是公司用户体系的核心服务,Python 3.11 + FastAPI,跑在 8 台 4C8G 的 ECS 上。其中一个凌晨 3 点的定时任务做三件事:把当天到期的优惠券标记 expired、清理 24 小时没活动的临时会话、把 30 天没登录的用户的轻量缓存从 Redis 里淘汰。280 万用户量级,这个任务平时跑 4 分钟左右。
周一研发小 W 加了一个"顺手"的功能:在清理临时会话的同时,把对应的临时 token 也从内存里的撤销名单字典 revoked_tokens 里删掉。代码 review 看起来非常无害,只多了一行 del revoked_tokens[token]。CI 全绿,灰度一台机器跑了 3 小时没事,周一晚上滚到全量。
周二 3:14,值班手机响了。sentry 告警里整整齐齐的一条:
RuntimeError: dictionary changed size during iteration
File "tasks/session_cleanup.py", line 87, in cleanup_expired_sessions
for token, meta in revoked_tokens.items():
File "tasks/session_cleanup.py", line 102, in cleanup_expired_sessions
del revoked_tokens[token]
第一反应:就是一个经典的"边遍历边删"嘛,改成 for token in list(revoked_tokens): 不就完了?如果故事到这里就结束,我也就不写这篇文章了。真实情况是,改完发上去,3 天后又炸,而且这次不是 RuntimeError,是任务"看起来"跑完了,但 Redis 里少了 4 万条本该淘汰的会话,直接导致内存告警。
事故时间线
| 时刻 | 事件 | 当时的判断 |
|---|---|---|
| 周二 03:14 | sentry 告警 RuntimeError | 典型边遍历边删,小事 |
| 周二 03:31 | 值班把 for 改成 for k in list(d) | 以为修好了 |
| 周二 09:00 | 滚到全量 | — |
| 周三 03:14 | 任务跑完,没报错 | 修复成功 |
| 周四 03:14 | 任务跑完,没报错 | 修复成功 |
| 周五 11:20 | Redis 内存告警,会话数 280w → 240w 但 key 还有 280w | 开始怀疑清理没生效 |
| 周五 14:00 | 对账发现 4.2 万 token 该撤销没撤销 | 修法 1 漏数据 |
| 周五 18:00 | 定位到 list(d) 的快照语义本身是对的,但是和"边遍历边添加"组合起来漏新增的 key | 修法选错 |
| 周六 | 重写成"先收集后批量删" | 稳定 |
看出来了吗?事故真正的代价不是 RuntimeError,是修法 1 选错以后,任务"假装跑完"了两天。不报错不等于对,这是这次事故最贵的一课。
第一轮排查:走偏了的并发假设
值班看到 sentry 第一反应是"并发问题"。理由也合理:这个任务里有一段是用 ThreadPoolExecutor 并行去 Redis 删 key 的,而 revoked_tokens 这个字典本身又有别的 HTTP 请求会写。所以最初的修法是加锁:
_revoked_lock = threading.Lock()
def cleanup_expired_sessions():
with _revoked_lock:
for token, meta in revoked_tokens.items():
if meta.expired_at < now():
del revoked_tokens[token]
跑了 5 分钟,还是 RuntimeError。这才意识到不是并发问题——锁住了整个循环,问题依然存在。因为错误根本不需要多线程,单线程同一个 for 循环里改 size 就会抛。这一步浪费了 40 分钟,但也让我们彻底排除了多线程方向,后面省了功夫。
最小化复现
排除并发以后,我们写了 8 行最小化复现,在本地 Python 3.11 跑出一模一样的栈:
>>> d = {f'k{i}': i for i in range(5)}
>>> for k, v in d.items():
... if v == 2:
... del d[k]
...
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
RuntimeError: dictionary changed size during iteration
下一个最小化版本更有意思——只改 size 会抛,只改 value 不抛:
>>> d = {'a': 1, 'b': 2, 'c': 3}
>>> for k in d:
... d[k] = d[k] * 10 # 只改 value,不动 size,OK
...
>>> d
{'a': 10, 'b': 20, 'c': 30}
>>> d = {'a': 1, 'b': 2}
>>> for k in d:
... d['new'] = 99 # 新增 key,size 变了,炸
...
RuntimeError: dictionary changed size during iteration
这里有一个非常容易被忽略的细节:对已存在的 key 重新赋值是安全的,新增 / 删除 key 不安全。再往下走一层,Python 3.7+ 字典在底层用 ma_version_tag 跟踪结构变化,迭代器在每次 __next__ 里都会校验这个 tag 是不是和迭代开始时一致;value 改不动 tag,key 改才会动。这就是它能"精准抛错"的原理。
问题本质:CPython dict 内部的 version tag
把这个机制画清楚有助于后面理解为什么有些"看起来安全"的修法其实不安全:
所以核心规则只有一条:在迭代周期里,任何让 dict 物理结构变化的操作都会让 tag 失效,下一次 __next__ 就会抛。而"物理结构变化"具体指的是新增、删除、重哈希(比如插入到一定阈值触发 resize)。仅仅改 value 不算。
另外补充一个冷知识:即使你不在 for 循环里改,而是通过 .items() 拿到的 view 在背后被改了,也会触发同一个 tag 校验。所以哪怕你 items = list(d.items()) 然后再迭代,如果另一个线程同时改了 d,迭代 items 不会抛(因为它已经是新 list 了),但是 view 对象上的迭代会抛。这就是为什么后面修法 1 要用 list(d) 而不是 list(d.items()) 视角不同。
修法 1:list(d) 快照遍历(看起来最简单,真的对吗?)
这是 stackoverflow 答案最多的一种。把 keys 提前固化成 list,迭代 list 而不是 dict 本身:
def cleanup_v1(d):
for k in list(d): # 拍快照
if d[k].expired_at < now():
del d[k] # 改 d 不影响 list 迭代
表面上看万事大吉,RuntimeError 没了。但这是我们栽跟头的地方:如果循环过程中往 d 里加了新 key,list 是迭代不到的。在我们这个清理任务里,场景是这样:
def cleanup_v1_buggy(d):
for k in list(d): # 此刻 d 里有 280w key
meta = d[k]
if meta.expired_at < now():
del d[k]
else:
# 在这里又有别的代码"顺手"把刚到的 token 也注册进来
if meta.has_chain_token:
d[meta.chain_token] = meta.new_meta # 新增的 key,list 看不到
问题不在 del,在 d[meta.chain_token] = ...。新加入的 chain_token 不会出现在 list 快照里,本轮就漏处理了。我们的 4.2 万漏数据就是这么来的——清理过程中,业务线上还在源源不断地往 revoked_tokens 里 push,而 list(d) 在 for 启动那一刻就定型了。
更阴险的是:list(d) 对于已经处理过的 k,如果后面又被 del 掉了,你在循环里 d[k] 会抛 KeyError。所以严格的写法还得加防御:
def cleanup_v1_safe(d):
for k in list(d):
if k not in d: # 可能已被别的线程删了
continue
meta = d.get(k)
if meta is None:
continue
if meta.expired_at < now():
d.pop(k, None)
真正的产线代码必须这么写。list(d) 不是银弹,它只解决了 RuntimeError,没解决"迭代中字典发生变化"这件事本身。
修法 2:先收集要改的,再统一改
这是我们最终选的方案。把"决定改什么"和"执行改"分成两步:
def cleanup_v2(d):
now_ts = now()
to_delete = [k for k, m in d.items() if m.expired_at < now_ts]
for k in to_delete:
d.pop(k, None)
这个写法有几个优点:
- 迭代纯只读。第一次的 list comprehension 只读 d 不改 d,绝对不会抛。
- 第二阶段不再迭代 d,只迭代 to_delete,d 改不改没关系。
- 语义清晰,review 一眼能看出来"先决策后执行"。
- 能加批量优化,比如 to_delete 攒到 1000 个再批量删,或者并行删 Redis。
缺点是要多一份内存放 to_delete。280 万 key 的话大概多用 30MB(每个 key 是 32 字节的 token 字符串),完全可以接受。如果你的字典是亿级,那要考虑分块。
修法 3:推导式重建
当业务语义是"保留满足条件的"而不是"删除满足条件的",字典推导式重建是最 pythonic 的写法:
def cleanup_v3(d):
now_ts = now()
return {k: m for k, m in d.items() if m.expired_at >= now_ts}
但这个写法只在两个前提下成立:
- 字典是局部变量或者外部能接收新引用(
state.cache = cleanup_v3(state.cache)) - 字典不大,或者你愿意承担"高峰期内存翻倍"
在我们的场景里,revoked_tokens 是一个全局模块级单例,有十几个地方 from app.state import revoked_tokens 然后直接持有引用,你重建以后那些地方还指向旧字典。这种"全局可变状态被到处持有"本身就是个味道,但已经无法在一次故障修复里推翻,所以我们没选修法 3。
修法 4:用专用结构,把"过期"这件事下沉到数据结构
真正的根因是用普通 dict 来做带 TTL 的容器,本身就是错误的选型。专门设计的数据结构能让"过期"成为属性而不是动作:
from cachetools import TTLCache
revoked_tokens = TTLCache(maxsize=10_000_000, ttl=3600 * 24)
# 写入的时候 TTL 自动跟着
revoked_tokens[token] = meta
# 读取自动判断过期
meta = revoked_tokens.get(token) # 过期会自动 evict
# 完全不需要主动 cleanup,定期触发即可
def cleanup_v4(d):
d.expire() # 内部一次性清理过期项,不暴露遍历
或者用 Redis 的 EXPIRE,把这层完全交给 Redis,Python 进程内根本不留 revoked_tokens。如果你的清单只用于"是否撤销过"这一个查询,后者更稳。
这种修法的代价是动数据结构,review 范围大,涉及十几个调用方,我们最终决定先用修法 2 救火,在下一个 sprint 把 TTLCache 迁移做掉。
4 种修法的性能基准
用 pyperf 在 Python 3.11.7 / Apple M2 上对一个 100 万 key 的字典做基准测试,假设 30% 需要删除:
import pyperf, random, string
def make_d():
return {''.join(random.choices(string.ascii_lowercase, k=10)): i
for i in range(1_000_000)}
def v1_list_keys(d):
for k in list(d):
if d[k] % 10 < 3:
del d[k]
def v2_collect_then_delete(d):
to_del = [k for k, v in d.items() if v % 10 < 3]
for k in to_del:
del d[k]
def v3_rebuild(d):
return {k: v for k, v in d.items() if v % 10 >= 3}
def v4_while_pop(d):
keys = list(d)
for k in keys:
if d[k] % 10 < 3:
d.pop(k, None)
runner = pyperf.Runner()
for fn in [v1_list_keys, v2_collect_then_delete, v3_rebuild, v4_while_pop]:
d = make_d()
runner.bench_func(fn.__name__, fn, d)
跑出来的结果(取 ms,平均 5 轮):
| 方法 | 用时 (ms) | 峰值额外内存 | 语义安全 |
|---|---|---|---|
| v1 list(d) 内 del | 178 | +8 MB(list 快照) | ⚠️ 漏新增 key |
| v2 先收集再批删 | 162 | +2.4 MB(只放要删的) | ✅ 安全 |
| v3 dict 推导式重建 | 134 | +88 MB(新旧两份) | ✅ 安全(无引用问题时) |
| v4 list(d) 内 pop | 189 | +8 MB | ⚠️ 同 v1 |
有几个结论:
- v3 推导式最快。原因是 Python 字典物理删除会触发槽位标记,大量 del 后字典内部 ma_used / ma_filled 失衡,会拖慢后续访问;而推导式是顺序写入新字典,内存布局连续。
- v2 比 v1 稍快,因为 to_delete 列表很小(30 万),迭代和删除分两阶段对 CPU cache 也更友好。
- v4 用 pop 反而最慢,因为 pop 内部多一次 hash 查找。如果你 100% 确定 k 在 d 里,用 del 更快;有可能不在就用 pop。
决策树:什么场景选哪种修法
用这个决策树过一遍 4.2 万漏数据的事故:我们的场景是"剔除"+"并发写入",决策树直接指向修法 2,完全对。事后看,如果当时凌晨 3:31 不是用直觉选修法 1,而是按这棵树走,就不会有后两天的丢数。
真实生产里的 3 个高频场景
场景一:配置热加载
很多服务支持配置热加载,一个常见写法是 watcher 监听配置文件变更,然后把 dict 形态的配置项就地改:
def on_config_change(new_config: dict):
for k in list(_config):
if k not in new_config:
del _config[k]
for k, v in new_config.items():
_config[k] = v
这个写法的问题不是 RuntimeError(因为先 list 了),而是非原子。配置在 del 完到 update 完中间有个窗口,其它线程读到的是不完整状态。正确写法:
def on_config_change(new_config: dict):
global _config
_config = dict(new_config) # 一次性替换引用,原子
对全局 dict 整体替换比 in-place 改安全得多,Python 的属性赋值是原子的。
场景二:缓存清理
简单 LRU 没用第三方库的话,人手写很容易踩坑:
# 错误写法
def evict_old():
for k, item in cache.items():
if item.ts < cutoff:
del cache[k] # RuntimeError
# 正确写法 + 性能优化(攒批删)
def evict_old():
cutoff = time.time() - 3600
to_del = []
for k, item in cache.items():
if item.ts < cutoff:
to_del.append(k)
if len(to_del) >= 1000:
for kk in to_del:
cache.pop(kk, None)
to_del.clear()
for kk in to_del:
cache.pop(kk, None)
第二种写法看起来啰嗦,但好处是不需要在内存里一次性持有几百万个待删 key,适合超大缓存。
场景三:会话超时清理(本文事故的真实场景)
用前面修法 2 + 批量 pop,加上幂等 + 多次重试 + 把"实际删除数"打到 metrics:
def cleanup_sessions():
now_ts = time.time()
rounds = 0
while True:
to_del = [k for k, m in revoked_tokens.items()
if m.expired_at < now_ts]
if not to_del:
break
for k in to_del:
revoked_tokens.pop(k, None)
metrics.deleted_sessions.inc(len(to_del))
rounds += 1
if rounds > 5: # 防止永远跑不完(并发写入太猛)
logger.warning(f'cleanup not converging, leftover={len(revoked_tokens)}')
break
关键是 while 循环 + 上限。如果清理过程中有新过期项进来,一轮收集只覆盖了开始那一刻的快照,跑完后还可能有漏。多跑几轮直到收敛,或者达到上限就让告警提醒人介入,比一轮跑完了事更稳。
跨语言对比:这不是 Python 一家的问题
| 语言 | 边遍历边改的行为 | 建议 |
|---|---|---|
| Python (CPython) | 显式 RuntimeError,ma_version_tag 校验 | list(d) + 收集后改 |
| Java HashMap | fail-fast,抛 ConcurrentModificationException | 用 Iterator.remove() 或 ConcurrentHashMap |
| Go map | 规范未定义,可能漏 / 重复 / panic | 把 keys 拷到 slice 再处理 |
| JavaScript Object | for...in 拿到的是动态的,行为依实现 | Object.keys() 先拍快照 |
| JavaScript Map | iterator 协议保证遍历顺序,但删 / 加在循环中行为复杂 | 同上,先拿 [...map.keys()] |
| Rust HashMap | 编译器借用检查直接拒绝 | retain() 或者 drain_filter() |
| C++ std::unordered_map | erase(it++) 是 UB-prone 的经典坑 | 用 erase 的返回值: it = m.erase(it) |
注意 Java 的 ConcurrentModificationException 和 Python 的 RuntimeError 几乎是一一对应的设计思路:都是 fail-fast,都依赖一个内部的 mod count,只是 Java 给你提供了 Iterator.remove() 这条合法的"边遍历边删"出口。Go 是最离谱的——规范里直接说"遍历过程中对 map 的修改的行为是未定义的",所以你可能根本没报错,只是某些条目丢了。Python 至少给了你一个明确的报错,这其实是个优点。
Rust 的 retain 给 Python 的启发
Rust 的 HashMap::retain 把"边遍历边过滤"做成了一个内置方法,签名是 fn retain<F>(&mut self, f: F) where F: FnMut(&K, &mut V) -> bool。我们在内部封装了一个 Python 等价:
def retain(d: dict, predicate) -> int:
"""像 Rust HashMap::retain, 返回删除的条目数。
在 d 上原地操作但通过 list 拍快照避免 RuntimeError。
"""
to_del = [k for k, v in d.items() if not predicate(k, v)]
for k in to_del:
d.pop(k, None)
return len(to_del)
# 用法:
deleted = retain(revoked_tokens, lambda k, m: m.expired_at >= now())
metrics.deleted_sessions.inc(deleted)
有了这个工具,所有团队成员都不会再手写遍历删了。把容易踩坑的模式封装成只有一种正确用法的 API,是工程化解决"重复踩同一种坑"最有效的手段。
测试如何覆盖:hypothesis 模糊测试
普通 unittest 写"删 3 个 key 看剩 7 个"这种测试,根本测不出"边遍历边加"这种边缘场景。我们用 hypothesis 写了一组覆盖:
from hypothesis import given, strategies as st
@given(
st.dictionaries(st.text(min_size=1, max_size=10), st.integers()),
st.lists(st.tuples(st.text(min_size=1, max_size=10), st.integers())),
)
def test_retain_with_concurrent_inserts(initial, inserts):
"""模拟 retain 过程中有新 key 进来。"""
d = dict(initial)
predicate = lambda k, v: v % 2 == 0
# 在 retain 前手动插入(模拟在 retain 启动前但 list 之后的状态)
for k, v in inserts:
d[k] = v
expected = {k: v for k, v in d.items() if predicate(k, v)}
retain(d, predicate)
assert d == expected
hypothesis 会自动生成几百种 edge case,包括空字典、单 key、重复 key、特殊字符 key 等等。跑一次 5 秒,跑完心里踏实。
事后立的 4 条规矩
- 禁止直接在 for k in d 的循环里改 d 的 size。 团队 lint 加了一条 ruff 规则,出现这种模式直接 fail CI。规则代码:
PLR1733 + 自定义检查器 RUF200。 - 所有"边遍历边过滤"必须用 utils.retain() 或 dict comprehension。 不允许手写循环加 del,review 直接打回。
- 带 TTL 的数据结构必须用 TTLCache / Redis EXPIRE,不允许用普通 dict 自己做过期管理。 已有的 5 个这种地方排进了下个 sprint。
- "修复"必须有对应的回归测试,且 PR 描述里必须能复现原 bug。 第一次修复用 list(d) 就是因为没有对应"边清理边插入"的测试用例。
为什么这种坑 11 个月没暴露
这才是真正值得反思的点。这个循环写法是 11 个月前就埋下的,为什么周二才炸?
原因有三个:
- 之前的 cleanup 只删不加。原代码
for token, meta in revoked_tokens.items(): if expired: del ...,只 del,从来没在循环里 d[k] = ...,所以一直没触发 RuntimeError。 - 周一加的新代码恰好引入了"循环中给 d 新增 key"。chain_token 是用户重新激活会生成的子 token,需要立即注册撤销,而注册撤销的代码刚好直接 d[chain_token] = ...。
- 灰度只跑了 3 小时。灰度时段不是业务高峰,chain_token 触发率很低,所以没炸。如果灰度跑满 24 小时覆盖凌晨,周二就不会真出事。
从此我们改了灰度策略:定时任务类的变更,灰度必须覆盖至少一次完整的任务周期。HTTP 接口类变更可以按 QPS 灰度,定时任务按"是否跑过一次完整周期"灰度。
更深一层:全局可变状态本身是问题
这次事故修法 2 是救火,真正的设计问题是用全局模块级 dict 来管理共享状态。这种写法的坏处:
- 无法 mock,单元测试要么不写要么导入污染
- 无法多 worker 共享(每个进程一份内存,数据不一致)
- 修改路径分散,十几个文件里都能 from xxx import revoked_tokens 然后改
- 关闭时无 cleanup hook,进程被 kill 时丢数据
我们计划在下一个季度把它迁到 Redis,这一项工作的 epic 已经创建。很多线上事故的根因不在表面那行代码,而在过去某个"先这样吧后面再说"的决策。
事故复盘模板:这次我们填的字段
| 字段 | 内容 |
|---|---|
| 影响范围 | 4.2 万会话未被撤销,Redis 内存增长 6%,无用户感知 |
| 持续时间 | 周二 03:14 - 周六 02:00,72 小时 |
| 触发因 | 新代码在循环里给字典加 key |
| 根因 | 循环遍历 + 动态修改 + 未理解 list(d) 的快照语义 |
| 修复 | 改用"收集后批删"模式,封装 retain 工具 |
| 预防 | ruff 规则禁止该模式,定时任务灰度覆盖完整周期 |
| 下一步 | 把 revoked_tokens 迁到 Redis(Q2 epic) |
| 不做的事 | 不补"加锁"——锁不能修复语义错误,只会掩盖问题 |
注意最后一行 "不做的事"。我们在事故初期讨论过加全局锁让循环序列化,这能让 RuntimeError 消失,但新 key 漏处理这件事会变成隐性错误,反而更难发现。明确写出来不做什么,有时候比写做什么更重要。
这一类问题的心智模型
把这次事故抽象一下,得到一个适用于所有"遍历过程中修改容器"的心智模型:
- 识别动作类型:你是在改 value(读写槽位)还是在改 key(改容器结构)?前者大概率安全,后者大概率不安全。
- 选 fail-fast 还是 silent-fail:Python / Java 选了 fail-fast(抛异常),Go 选了 silent-fail(行为未定义)。fail-fast 更友好,silent-fail 让你以为没问题。
- 区分"绕过报错"和"解决问题":list(d) 只绕过了报错,没解决"迭代中数据变化"这件事。真正的解决是把决策和修改拆成两步。
- 把 unsafe 模式封装成 safe API:retain() / TTLCache 等,让团队只有一种正确用法。
后续小事故:同一团队同一周
这次事故复盘以后第三天,另一个组的同事修一个类似的 bug,他用了 list(d) 改法。代码 review 时被打回,我们给他指了 retain() 工具。他用了以后 OK。但这件事说明,一个团队里"踩同样坑"的概率很高,如果只在自己组内复盘,知识传播不出去。后来我们把这次复盘整理成了 5 分钟的内部技术分享,所有后端组都听了一遍。
从那以后,公司内部出现"边遍历边改 dict"模式的 PR 都会触发自动评论提示用 retain。半年过去,这一类问题再没复发。
额外补充:dict 的内部实现细节(为什么是 ma_version_tag)
这一节给真正想搞懂底层的人。Python 3.6 之前的 dict 是用纯哈希表 + 开放寻址实现的,从 3.6 开始 CPython 把字典内部改成了"紧凑数组 + 索引数组"的结构(也就是著名的 compact dict)。结构体大致长这样:
typedef struct {
PyObject_HEAD
Py_ssize_t ma_used; // 实际有效条目数
uint64_t ma_version_tag; // 每次结构性修改 +1
PyDictKeysObject *ma_keys; // 紧凑数组,存 key / value / hash
PyObject **ma_values; // 仅 split dict 用
} PyDictObject;
每次执行 d[k] = v(新 key)、del d[k]、d.pop(k)、d.clear() 这些操作,ma_version_tag 都会 +1。但是 d[k] = v 对一个已有的 key 来说,只是覆盖 PyDictKeysObject 里那个槽位的 value 指针,不动 tag。这就是为什么改 value 不抛。
dict 的迭代器结构体:
typedef struct {
PyObject_HEAD
PyDictObject *di_dict; // 指向被迭代的 dict
Py_ssize_t di_used; // 创建时的 ma_used 快照
Py_ssize_t di_pos; // 当前迭代位置
PyObject *di_result; // 临时复用的元组(items 用)
Py_ssize_t len; // 创建时的长度快照
} dictiterobject;
在 dictiter_iternextkey 这种 next 函数里,第一件事就是检查 d->ma_used != di->di_used,不一致就抛 RuntimeError("dictionary changed size during iteration")。所以这个检查的成本几乎为零,一次 64 位整数比较。这也解释了为什么我们没办法"关掉"这个检查——它是 C 层硬编码的。
知道这个底层细节有什么用?有两件事会变得直觉:
- resize 触发的时机。当 dict 的 fill rate 超过 2/3 时,内部会触发 resize,resize 会让 ma_version_tag 改变。所以在循环里大量改 value 一般没问题,但如果你改 value 的过程涉及到大量临时构造然后被 GC,可能间接影响 dict 状态(实际上不会,这里只是提醒"逻辑上无关的操作有时候在底层是相关的")。
- 线程安全。GIL 保证了一次
d[k] = v是原子的,但跨多个操作之间没有原子性。ma_version_tag是单线程内的失败检测,不是线程安全机制。多线程下要么加锁要么用 ConcurrentDict。
跑题但相关:set / list 也有同类问题
set 也有 ma_version_tag 等价物,所以下面这段也会抛:
>>> s = {1, 2, 3, 4, 5}
>>> for x in s:
... if x % 2 == 0:
... s.remove(x)
...
RuntimeError: Set changed size during iteration
list 比较特殊,它不会抛 RuntimeError,但会有"漏元素 / 重复处理"的诡异行为:
>>> lst = [1, 2, 3, 4, 5]
>>> for x in lst:
... if x == 2:
... lst.remove(x)
... print(x)
...
1
2
4 # 3 被跳过了!因为删 2 以后,索引 1 变成了 3,然后迭代器把索引推到 2 就是 4
5
从可发现性来说,dict / set 的 fail-fast 实际上比 list 的 silent-bug 友好。所以下次有人吐槽 "Python dict 不让我边遍历边改太烦了",可以告诉他:list 让你改但是改完结果错的,你更想要哪个?
更深一层:CPython 3.12 / 3.13 的变化
Python 3.12 引入了 PEP 669(monitoring API),没直接改 dict 行为。Python 3.13 把字典的并发安全做了进一步加强(在自由线程构建里),但 single-thread fail-fast 行为保持不变,这是个语义合同。
不过 3.13 free-threading 构建(PEP 703)下,字典的内部 lock 会让 ma_version_tag 在多线程修改场景下更可靠。我们在测试 3.13 free-threading 时发现:之前在带 GIL 下"凑巧没炸"的代码,在 free-threading 下因为真正并发,RuntimeError 变成必然事件。所以如果你的服务计划迁 3.13 free-threading,提前把所有"边遍历边改"的地方扫干净是必修课。
排查工具链:遇到这类报错先跑这 5 个命令
| 命令 | 用途 |
|---|---|
python -X dev script.py |
开 dev mode,字典 mutation 异常会带更多上下文 |
py-spy dump --pid <pid> |
抓在炸的那一刻的栈,看是哪一行哪一个 dict |
ruff check --select PLR1733,RUF200 |
静态扫"循环里改容器"的可疑模式 |
git log -p tasks/session_cleanup.py |
找最近改了相关代码的 commit,看是哪次引入的 |
grep -rn "for .* in .*: *$" -A 5 \| grep "del\|\.pop\|\[.\+\] *=" |
暴力扫描代码库里所有"循环里疑似改容器"的模式 |
事故当晚我们用第 4 条 git log 直接定位到周一的那个 commit,只用了 5 分钟。大部分线上事故的"是谁动的"比"是哪行错"更重要,git log 这个工具被低估了。
更进一步:用 typing 把"不可变"信息编码进类型
有了教训以后,我们把模块级 dict 全部加了 Mapping 类型注解:
from typing import Mapping, MutableMapping
# 模块级,只读视图
revoked_tokens: Mapping[str, TokenMeta] = _revoked_storage
# 真正可写的 (内部模块持有)
_revoked_storage: MutableMapping[str, TokenMeta] = {}
# 想改的话必须用专门的 API,不能直接动 revoked_tokens
def revoke(token: str, meta: TokenMeta) -> None:
_revoked_storage[token] = meta
def unrevoke(token: str) -> None:
_revoked_storage.pop(token, None)
这样所有 from app.state import revoked_tokens 然后想 revoked_tokens[k] = v 的代码,mypy 会直接报错,在 PR 阶段就被拦下。这种用类型系统强制约束的做法比 lint 规则更早期、更可靠。
团队培训:新人入职第一周必读案例
这个事故现在是公司新后端入职第一周的必读复盘材料,跟我们之前的几次"经典事故"一起。新人入职时会看到一份 10 页 PDF,里面有:
- 本次故障的完整时间线
- 错误的修法和正确的修法的代码 diff
- retain() 工具的使用示例
- ruff 规则配置和它会拦下什么
- 一组 exercise:让新人在沙盒里复现 RuntimeError 和漏数据
这套材料的价值不仅是"让新人不再犯",更重要的是让团队认知到"事故是知识资产"。出过的事故沉淀成文档、沉淀成 lint 规则、沉淀成入职教材,组织能力才真正在累积。
横向案例:Java 团队同一周的类似事故
事故复盘内部分享的时候,Java 那边的同事说:"我们上个月也踩了一模一样的坑。" 他们的代码是这样:
// 错误写法
for (Map.Entry<String, Session> e : sessions.entrySet()) {
if (e.getValue().expired()) {
sessions.remove(e.getKey()); // ConcurrentModificationException
}
}
// 第一次"修复"
for (String key : new ArrayList<>(sessions.keySet())) {
if (sessions.get(key).expired()) {
sessions.remove(key); // 同样漏新增 key
}
}
// 最终方案
sessions.entrySet().removeIf(e -> e.getValue().expired());
Java 8+ 的 Collection.removeIf 就是 Java 版本的 retain,JDK 内置,没坑。看到这个我们就在想:Python 应该把 retain 标准化进 stdlib。可惜 PEP 暂时还没有相关提案,只能各团队自己封。
一个被忽略的真实场景:多进程下的字典是另一个世界
本文前面所有讨论都假设是单进程内的字典。如果你用 multiprocessing 共享字典(multiprocessing.Manager().dict()),整套规则要全部重新理解:
from multiprocessing import Manager, Process
def worker(d):
for k in list(d): # 这里的 list() 是 IPC 调用!
if d[k] % 2 == 0:
del d[k] # 也是 IPC 调用
if __name__ == '__main__':
with Manager() as mgr:
d = mgr.dict({i: i for i in range(100)})
procs = [Process(target=worker, args=(d,)) for _ in range(4)]
for p in procs: p.start()
for p in procs: p.join()
print(len(d))
这里每次访问 d 都是跨进程 RPC 调用,语义和单进程字典完全不同:
- 不会抛 RuntimeError,因为它不是真正的 Python dict
- 但会有 race condition,4 个进程同时遍历 + 删,结果不可预测
- 性能极差:100 个 key,4 进程并发跑,可能比单进程慢 10 倍(每次访问都要序列化 + IPC)
正确做法是不要用 Manager().dict() 当共享状态,要么用 Redis,要么用 multiprocessing.shared_memory,要么干脆用文件 + lock。看起来像字典但不是字典的东西,踩坑成本比真字典高 100 倍。
真实场景四:Django ORM QuerySet 缓存的隐性踩坑
这不是字面意义的"字典遍历",但模式完全一致——Django 的 QuerySet 在第一次迭代时会构建内部 _result_cache 这个 list,如果你在循环里 update / delete,行为非常诡异:
# 反面教材
qs = User.objects.filter(is_active=False)
for user in qs:
user.delete() # 删完以后 qs 内部状态混乱
# 正确做法
ids = list(User.objects.filter(is_active=False).values_list('id', flat=True))
User.objects.filter(id__in=ids).delete()
# 或者直接 bulk:
User.objects.filter(is_active=False).delete()
Django 团队官方文档里专门强调过这一点。模式是一样的:先把"要操作什么"固化下来,再批量操作。和字典遍历的修法 2 同源。
另一个真实场景:订单状态机里的字典遍历事故
不同团队的另一次类似事故,场景是订单状态机的批量推进。原代码:
def advance_orders(state_map: dict):
for order_id, state in state_map.items():
if state == 'pending':
try:
next_state = process(order_id)
state_map[order_id] = next_state # 仅改 value, 安全
if next_state == 'completed':
# 顺手归档关联子单
for sub_id in get_sub_orders(order_id):
state_map[sub_id] = 'archived' # 新增 key, 炸!
except Exception:
del state_map[order_id] # 删 key, 同样炸
这段代码同时踩了"新增 key"和"删除 key"两个雷,看起来无害的"顺手归档子单"是真正的元凶。修法:
def advance_orders_safe(state_map: dict):
snapshot = list(state_map.items()) # 拍快照
new_entries = {} # 暂存新增
to_delete = [] # 暂存删除
for order_id, state in snapshot:
if state == 'pending':
try:
next_state = process(order_id)
new_entries[order_id] = next_state
if next_state == 'completed':
for sub_id in get_sub_orders(order_id):
new_entries[sub_id] = 'archived'
except Exception:
to_delete.append(order_id)
# 一次性应用
state_map.update(new_entries)
for k in to_delete:
state_map.pop(k, None)
新增和删除分别攒到两个临时容器里,循环结束后一次性写回。这种"事务式"的写法即使在并发场景下,只要外层加锁,内部就是原子的。把"做"和"应用"分开,是处理任何带副作用循环的通用思路,不只字典。
真实场景五:Pandas DataFrame 行内修改的同类问题
pandas 也有类似的反模式:
# 反面教材: SettingWithCopyWarning + 不一致的修改结果
for idx, row in df.iterrows():
if row['amount'] > 1000:
df.loc[idx, 'flag'] = True
# 推荐: 向量化
df['flag'] = df['amount'] > 1000
pandas 在循环里改 DataFrame 性能差 100 倍,而且某些情况下会改到副本而不是原表。这背后是"先决策后修改"思路的另一种体现——向量化本质上就是"对整列一次性决策,再一次性写回"。
一个被忽略的边界:OrderedDict 和普通 dict 在迭代行为上的差异
Python 3.7 之后普通 dict 保证插入顺序,大家以为 OrderedDict 没用了,实际上 OrderedDict 有几个语义差异在边遍历边改场景下值得知道:
from collections import OrderedDict
od = OrderedDict([(i, i) for i in range(5)])
# OrderedDict 有 move_to_end / popitem(last=False) 等专门 API
od.move_to_end(2) # 把 key=2 移到最后
od.popitem(last=False) # 先进先出,从头删
# 这些 API 改了顺序,内部 version 也会变
for k in od:
od.move_to_end(k) # 同样抛 RuntimeError
所以 OrderedDict 并没有免疫这个问题,只是它给了你额外的 API。真正在 LRU / FIFO 场景下需要的不是 OrderedDict,而是 collections.deque 或 cachetools.LRUCache,前者是双端队列,后者就是为 LRU 设计的。
一组容易混淆的细节:dict.update / dict | other / |= 的行为
Python 3.9 引入了 dict 的合并运算符 | 和原地合并 |=,它们和 update 行为略有差异,在循环里使用时要小心:
d = {'a': 1, 'b': 2}
# 在循环中合并:同样会触发 RuntimeError, 因为 size 变了
for k in d:
d |= {'new': 99} # RuntimeError
break
# update 一样
for k in d:
d.update(new=99) # RuntimeError
break
# 但 d | other 是返回新 dict, 不动 d, 所以安全
for k in d:
new_d = d | {'new': 99} # OK, d 没变
break
规则依然清晰:任何修改 d 物理结构的操作都会让 tag 失效;不修改 d 的操作(如返回新 dict 的 | 运算)是安全的。这条规则适用于所有内置容器,可以作为一个心智模型记下来。
极端场景:超大字典的删除策略
如果你的字典是亿级(比如 100M+ key),前面所有"收集后批删"的策略都会因为 list 本身太大而内存爆炸。这种规模需要专门的设计:
def cleanup_huge_dict(d):
"""支持 1 亿+ key 的清理, 内存峰值 < 10MB."""
BATCH = 10_000
cursor_keys = list(d.keys())[:BATCH] # 但 list(d.keys()) 本身就 OOM 了!
# 所以亿级字典根本不该用 Python 内存,
# 必须改用 RocksDB / LMDB / Redis 等外部存储
到这种规模,语言层面的 dict 已经不是合适的工具。"用对工具"比"用好工具"重要得多。我见过有团队用 Python dict 存 1.5 亿 user_id,内存 38GB,GC pause 几十秒,最后改用 LMDB,内存降到 200MB,延迟也稳定。如果你发现自己在为字典的清理性能纠结,先问问自己:这数据是不是不该放内存里。
误区清算:网上常见的 5 个错误答案
StackOverflow 高分答案里,以下 5 个都有错或者只对一半:
- "用 list(d.items()) 就行" —— 半对。RuntimeError 是没了,但漏新增的问题没解决,语义和我们事故里的 list(d) 一样。
- "用 d.copy() 遍历副本" —— 比 list(d.items()) 多一份完整 dict 内存,性能更差,问题相同。
- "用 while d: k = next(iter(d))" —— 死循环风险高,如果删除条件不满足永远不退出,而且每次 next(iter(d)) 都是 O(1) 但创建迭代器对象有开销。
- "用 dict.fromkeys 过滤" —— 误导,fromkeys 是用同一个 value 填充所有 key,做过滤是用错工具。
- "加锁就行" —— 完全没理解问题。这不是并发问题,单线程内同一个迭代也会抛。锁只能解决并发,解决不了语义。
记住一条:能让 RuntimeError 消失的修法很多,但能让语义对的修法少。看到 stackoverflow 高分答案不要直接抄,先用 hypothesis 跑一遍。
对照实验:不同 Python 解释器的差异
我们顺手测了下不同实现:
| 实现 | 版本 | 边遍历边删的行为 |
|---|---|---|
| CPython | 3.11.7 | RuntimeError |
| CPython | 3.13 (free-threading) | RuntimeError, 但多线程并发触发概率更高 |
| PyPy | 7.3.13 | RuntimeError(行为对齐 CPython) |
| MicroPython | 1.22 | 不抛但行为未定义 |
| Jython | 2.7 | Java HashMap 行为, ConcurrentModificationException |
所以如果你的代码可能跑在 MicroPython 或者 Jython 上,fail-fast 这个安全网就没了,要更小心。embedded Python 场景下(树莓派 / 嵌入式设备)我们见过因为这个差异导致的"在 CPython 测试通过,部署后偶发数据错乱"的故障。跨解释器的兼容性是一个常被忽视的维度,代码迁移时务必检查。
容易忽略的:dict view 对象的失效语义
Python 的 .keys() / .values() / .items() 返回的是动态视图而不是快照,这导致一些反直觉行为:
d = {'a': 1, 'b': 2, 'c': 3}
keys = d.keys()
print(keys) # dict_keys(['a', 'b', 'c'])
d['d'] = 4
print(keys) # dict_keys(['a', 'b', 'c', 'd']) - 视图自动反映新状态
del d['a']
print(keys) # dict_keys(['b', 'c', 'd'])
这意味着你不能"先调 d.keys() 拿到一份,然后在循环里随便改 d"——keys 不是快照,它是一个"对 d 当前状态的窗口"。所以下面这段也会抛:
d = {'a': 1, 'b': 2}
keys = d.keys()
for k in keys:
d[f'new_{k}'] = 0 # RuntimeError, 因为 keys 是 d 的视图
正确写法依然是 for k in list(d.keys()):,把 view 物化成 list。视图(view)和快照(snapshot)的区别是这次事故衍生学习的另一个重要概念,影响范围远大于 dict——pandas Series 的视图、NumPy 数组的切片、SQLAlchemy 的 query,全都有类似设计。
历史趣闻:Python 不是一开始就抛 RuntimeError 的
Python 2.x 早期某些版本下,在循环里改 dict 是静默错误的,行为类似 Go——某些 key 漏了某些 key 重复访问,看不出报错。Python 2.4 引入了显式 RuntimeError,Python 3 把这个行为继承下来并加强了 ma_version_tag 的精度。所以现在的 RuntimeError 是历经几代设计权衡后的最佳选择:报错比静默更友好。
对比下 PHP 早期数组也是边遍历边改的"地狱场景",PHP 5.x 行为依实现版本变化,PHP 7+ 才统一成"快照"语义。每种语言都在这个问题上交过学费。
更现实的扩展:大规模分布式场景下的"边遍历边改"
当字典从单机内存扩展到 Redis、Kafka topic、消息队列时,"边遍历边改"问题有同源的分布式版本。比如用 Redis SCAN 遍历 keys 同时 DEL:
import redis
r = redis.Redis()
# 反面教材: SCAN + DEL 同时进行
cursor = 0
while True:
cursor, keys = r.scan(cursor=cursor, match='session:*', count=100)
for k in keys:
if is_expired(r.hgetall(k)):
r.delete(k)
if cursor == 0:
break
这个写法的问题是 SCAN 在游标推进过程中,如果有 key 被删了 / 新增了,可能漏掉或重复返回某些 key——Redis 文档明确说 SCAN 提供的是"弱保证"。修法和单机字典一致:先收集再批删。
def cleanup_redis_safe():
cursor = 0
to_del = []
# 阶段 1: 只收集
while True:
cursor, keys = r.scan(cursor=cursor, match='session:*', count=1000)
pipe = r.pipeline()
for k in keys:
pipe.hgetall(k)
for k, meta in zip(keys, pipe.execute()):
if is_expired(meta):
to_del.append(k)
if cursor == 0:
break
# 阶段 2: 批量删
for i in range(0, len(to_del), 1000):
r.delete(*to_del[i:i+1000])
这是"两阶段写法"在分布式场景的应用,逻辑完全一致。从单机字典学到的思维模式可以直接复用到 Redis、Kafka、消息队列,这是把"具体问题"抽象成"通用模式"的价值。
性能:批量删 vs 逐个删的微观差异
我们在基准里看到了 v2 和 v4 的差异,挖深一层:
import timeit
# 一次 del 一个 key, 共 30 万次
def one_by_one(d):
keys = list(d)
for k in keys:
if d[k] % 10 < 3:
del d[k]
# 攒批一次性删
def batch_1000(d):
to_del = []
for k, v in d.items():
if v % 10 < 3:
to_del.append(k)
if len(to_del) >= 1000:
for kk in to_del:
del d[kk]
to_del.clear()
for kk in to_del:
del d[kk]
# 最快: 全量收集后一次性删
def all_at_once(d):
to_del = [k for k, v in d.items() if v % 10 < 3]
for k in to_del:
del d[k]
实测 100 万 key 字典:
| 方式 | 耗时 | 内存峰值 | 适用场景 |
|---|---|---|---|
| one_by_one | 178 ms | +8 MB | 不推荐 |
| batch_1000 | 164 ms | +0.03 MB | 超大字典(内存敏感) |
| all_at_once | 162 ms | +2.4 MB | 常规字典(推荐) |
差异其实不大,小字典随便选。但有一个有意思的观察:删除超过原大小 1/3 的元素以后,dict 内部不会自动 resize 缩小,所以删除完占用的内存不会立即降低。要降低内存必须 d = dict(d) 重建。事故现场我们一开始以为是删除没生效,后来发现是 RSS 内存的 hysteresis 现象——结构上删了,内存上没还回去。
异步场景的额外坑:asyncio 里改字典
FastAPI / asyncio 服务里,有 await 切点的代码,需要特别注意"看起来是单线程"实际是协程切换:
async def cleanup_with_await(d):
for k in list(d):
meta = d[k]
if meta.expired:
# 假设这里是 await Redis 操作
await redis.delete(f"session:{k}")
del d[k]
# 在 await 期间,其它协程可能改了 d
# list 是快照, 不会抛, 但是 d[k] 可能已经被改
这里有 3 个微妙的坑:
- list(d) 是快照,await 期间 d 被其它协程改 不会让 list 改变,所以不抛。
- 但 d[k] 可能在 await 期间被别的协程 del 了,你的 del 就会抛 KeyError。所以必须用 d.pop(k, None)。
- 每个 await 切点都是潜在的 race window,需要把"状态决策 + 状态修改"放在同一个非 await 块里。
修法:
async def cleanup_async_safe(d):
# 1. 同步阶段, 不 await, 拿到决策
now_ts = time.time()
to_del = [k for k, m in d.items() if m.expired_at < now_ts]
# 2. 异步阶段, 批量删 Redis
await asyncio.gather(
*[redis.delete(f"session:{k}") for k in to_del]
)
# 3. 同步阶段, 批量删本地
for k in to_del:
d.pop(k, None)
清晰地把同步决策、异步 IO、同步收尾分成三段,每一段内部都是原子的。这种"同步岛 + 异步桥"模式在 asyncio 里非常通用,不只用于字典清理。
从这次事故到团队的整体改进
事故发生 1 个月后,我们做了一次回访,统计这套改进带来的实际效果:
| 指标 | 事故前 | 事故后 1 个月 | 事故后 6 个月 |
|---|---|---|---|
| "边遍历边改"模式的 PR 数 | 平均每周 2-3 个 | 每周 0-1 个 | 近 90 天 0 个 |
| 同类 RuntimeError 告警 | 每月 1-2 次 | 0 次 | 0 次 |
| 定时任务漏数事故 | 本次为唯一一次 | 0 次 | 0 次 |
| retain() 工具调用点 | 0 | 14 处 | 47 处 |
| 团队对该问题的认知率 | 30%(估算) | 100%(必读材料) | 新人入职即覆盖 |
这些数字背后的逻辑很简单:一次事故带来的认知提升 + 工具沉淀,大于 100 次空喊"要小心字典遍历"。所以下次出事故,别只想着赶紧改回去,而要想:这次事故能让团队学到什么、能沉淀什么工具、能加什么 lint。
实战 checklist:遇到 "edge case in iteration" 怎么自检
这是一份事后整理出来的自检清单,贴在团队 wiki:
- 这个循环里有没有
del/.pop()/d[k] = v对正在迭代的 d? - 如果改成
for k in list(d):,新加进来的 key 漏处理可以接受吗?业务能容忍吗? - 能不能改成"先收集要改的,再统一改"两阶段写法?
- 有没有可能根本不需要循环 + 改,而是用 dict comprehension 重建?
- 这个 dict 是不是带 TTL 语义?如果是,该用 TTLCache 而不是普通 dict。
- 有没有并发写入?如果有,光修这个循环不够,还要看锁 / 原子性。
- 有没有写测试覆盖"循环过程中并发修改"的情况?
- 事故修完后,有没有把 lint / 类型 / 工具 API 沉淀下来防止下次再踩?
把这 8 条贴在你的编辑器开屏页或者 PR 模板里,过一遍只要 30 秒,能省下不止 78 小时。
事故后的代码评审会议:6 条新增的 review 规则
事故复盘以后,我们在 PR 模板里增加了 6 条字典 / 集合相关的 review 强制项,这 6 条要求 reviewer 必须显式确认:
- 本 PR 是否新增了"for ... in 字典" 同时 在循环里 改了 字典 size 的代码?如果有,必须改成 retain() 或两阶段写法。
- 本 PR 是否新增了模块级可变 dict / list / set?如果是,需要明确说明并发安全保证,以及考虑能否改为类属性 + 显式 API 暴露。
- 本 PR 是否新增了带 TTL 语义的容器?如果是,必须用 TTLCache / Redis,不能用普通 dict + 主动清理。
- 本 PR 是否在 async 函数里持有可变状态横跨 await?如果是,显式说明 race window 怎么处理。
- 本 PR 是否新增了"清理 / 淘汰 / 回收"类的定时任务?如果是,必须有单元测试覆盖"清理过程中并发写入"场景。
- 本 PR 修复的 bug 是否对应增加了一个 regression test?能复现原 bug 才能合并。
这 6 条不是教条,是用 4.2 万漏数据换来的。每一次有人想绕开其中一条,我们会把当时的 sentry 链接发给他看,然后他就老老实实补上了。
更广义的设计原则:把不变量编码进类型 / 工具
这次事故抽象出一个普适设计原则:不变量(invariant)如果靠"程序员记得遵守"来维持,就不算真的不变量。真正的不变量应该:
- 被类型系统强制:Mapping vs MutableMapping
- 被工具强制:retain() 只有一种正确用法
- 被 lint 强制:PLR1733 / 自定义规则
- 被 review 流程强制:PR 模板的 6 条
- 被入职教材强制:新人必读案例
5 层防御,任何一层漏了都还有兜底。这种层叠式约束是大型团队保持代码质量的核心机制,远比"我们都是高水平工程师所以不会犯低级错"这种自欺有用。
后续:从这个事故衍生出的 ruff 自定义规则
我们最后给 ruff 写了一个自定义检查器,代码不长但效果立竿见影:
"""ruff plugin: detect 'for k in d:' followed by 'del d[k]' or 'd[..] = ..' in same scope."""
import ast
class IterModifyChecker(ast.NodeVisitor):
def __init__(self):
self.errors = []
def visit_For(self, node):
if not isinstance(node.iter, ast.Name):
self.generic_visit(node); return
target_dict = node.iter.id
for child in ast.walk(node):
if isinstance(child, ast.Delete):
for tgt in child.targets:
if (isinstance(tgt, ast.Subscript)
and isinstance(tgt.value, ast.Name)
and tgt.value.id == target_dict):
self.errors.append((node.lineno,
f"RUF200: 'del {target_dict}[...]' inside 'for ... in {target_dict}'"))
if isinstance(child, ast.Assign):
for tgt in child.targets:
if (isinstance(tgt, ast.Subscript)
and isinstance(tgt.value, ast.Name)
and tgt.value.id == target_dict):
self.errors.append((node.lineno,
f"RUF200: assignment to '{target_dict}[...]' inside 'for ... in {target_dict}'"))
self.generic_visit(node)
挂到 ruff 之后,所有 PR 自动扫描,出现该模式直接 fail CI。把 review 的人脑判断变成 lint 的机器判断,是工程化的本质。我们用同一套思路给团队加了 12 条自定义 ruff 规则,每一条背后都是一次事故。
给读到这里的工程师的 3 条建议
- 看到 RuntimeError: dictionary changed size during iteration,先冷静。这是 Python 的好心,在给你早期警报。先把场景写成最小化复现,再决定修法。直觉选 list(d) 的代价你已经看到了。
- 不要孤立地修一个 bug。问自己:这类 bug 还有多少地方可能埋着?能不能让它在工具层面 / 类型层面再也无法存在?retain() 是个 5 行的小函数,但它帮整个团队消灭了一整类问题。
- 事故是组织级资产,不是个人耻辱。这次事故是新人小 W 写的代码触发的,但事后我们没让他写 PIP,而是让他主讲那次内部分享。他后来说:"那次分享我准备了一周,读了 CPython dict 的源码,现在团队里没人比我更懂这块。"——这就是事故的正向价值。
总结
把这次事故压缩成 4 句话:
- 不报错不等于对——list(d) 让 RuntimeError 消失了,但数据漏了 4 万条。
- 问题的根因往往是 11 个月前的某个决定,不是触发它的那行代码。
- 把容易踩坑的模式封装成只有一种正确用法的 API,比写一篇文档讲"不要这么做"管用得多。
- "先这样吧后面再说"的全局可变状态,会在你忘了它的时候报复你。
下次再有人在循环里改字典,别急着选 list(d),先问自己:循环里会不会有新 key 加进来?会的话,收集后批删才对。这是这次事故 6 小时排查 + 72 小时漏数换来的一句话——希望读到的人能省下这 78 个小时。
给定时任务设计的"双段式"模板
把"清理类定时任务"的通用模板抽出来,任何团队都可以直接套用:
async def two_phase_cleanup_template(
container,
decide, # (k, v) -> 'keep' | 'delete' | 'modify'
apply_modify, # (k, v) -> new_v (optional)
apply_delete, # async (k) -> None, 例如远端清理
max_rounds=5,
):
"""通用清理模板: 决策 / 应用 / 收敛三段."""
for round_no in range(max_rounds):
decisions = []
snapshot = list(container.items())
for k, v in snapshot:
action = decide(k, v)
if action != 'keep':
decisions.append((action, k, v))
if not decisions:
return round_no # 已收敛
deletes = [k for act, k, v in decisions if act == 'delete']
modifies = [(k, v) for act, k, v in decisions if act == 'modify']
await asyncio.gather(*[apply_delete(k) for k in deletes])
for k in deletes:
container.pop(k, None)
for k, v in modifies:
container[k] = apply_modify(k, v)
logger.warning('cleanup did not converge after %d rounds', max_rounds)
return max_rounds
这个模板我们在 4 个不同业务里复用,几乎没有改动。从一次事故抽象出一个通用模板,是工程化最高效的产出。
附:retain 完整版工具代码
最后把团队最终用的 retain 工具完整贴出来,你可以直接拷到自己项目里。带类型注解、带 metrics、带日志、带覆盖率单测的入口:
from typing import TypeVar, Callable, MutableMapping
import logging
K = TypeVar('K')
V = TypeVar('V')
logger = logging.getLogger(__name__)
def retain(
d: MutableMapping[K, V],
predicate: Callable[[K, V], bool],
*,
name: str = 'unnamed',
log_threshold: int = 1000,
) -> int:
"""Rust HashMap::retain 的 Python 等价,返回删除条目数。
Args:
d: 要原地清理的可变映射
predicate: 返回 True 保留, False 删除
name: 用于日志和 metrics 的标签
log_threshold: 删除数超过该值时打 warn 日志
Returns:
实际删除的条目数
"""
to_del = [k for k, v in d.items() if not predicate(k, v)]
for k in to_del:
d.pop(k, None)
count = len(to_del)
if count >= log_threshold:
logger.warning(f'retain[{name}]: deleted {count} entries (size now {len(d)})')
return count
用 5 行核心代码 + 注释 + 日志,换来 47 处调用、6 个月零事故。这就是把一个 bug 沉淀成工具的力量。
这次事故从 sentry 第一条告警到完全闭环,经历了 78 小时、6 名工程师参与、3 次 PR、5 条 lint 规则、1 篇内部分享、1 份新人入职材料。回过头看,真正贵的不是那 4.2 万条漏数,而是团队对"看起来无害的循环写法"建立的警惕。这种警惕一旦内化进编码习惯、CI 规则、PR 模板,就成了组织的肌肉记忆,不会因为人员变动而消失。把每一次事故当成组织能力的投资机会,这才是真正的"事故复盘"的意义,而不是写一份没人会再看的 markdown 然后归档进 wiki 角落。
希望读到这里的你,下次写 for k in d: 时,手指停顿一秒,问自己一句:这个循环里,会不会改 d?然后选对修法,不要让自己也成为下一篇事故复盘的主角。这一秒的停顿,可能为你的团队省下一个凌晨 3 点的告警、几小时的排查、和几万条悄无声息漏掉的数据。
—— 别看了 · 2026