2023 年我做一个站内实时消息推送:用户开着网页,不用刷新,就能实时收到新通知、新私信。我选了 WebSocket。第一版我做得很省事:服务端用 WebSocket 库起一个 server,客户端 new WebSocket() 连上去,服务端有消息就 send,客户端 onmessage 收到就显示。本地一测——完美:消息几乎瞬间就到。我心里很踏实:"WebSocket 嘛,不就是建一个长连接,两边随便互发消息。"可等它真正上线、面对真实的用户和真实的网络,一串问题冒了出来。第一种:用户反馈"开着页面挂了一会儿,就再也收不到消息了"——连接被中间的代理悄悄掐断了,可两边都不知道。第二种:用户的网络抖动一下、切了个 wifi,连接断了,然后就永远断了——客户端根本没有重连。第三种:有人手动刷新页面重连上来,可断开的那几分钟里推送的消息,全丢了。第四种:服务端的连接列表越积越多,内存一路涨——大量连接其实早就死了,服务端却还留着它们、还在往里写。最崩溃的一次:我们加到两台服务器,用户 A 连在服务器 1,可他的消息是服务器 2 处理产生的,服务器 2 手里根本没有 A 的连接,消息推不出去。我盯着这一连串问题想了很久才彻底想明白,第一版错在一个根本的认知上:我以为"WebSocket 就是建个长连接,两边随便互发消息就行"。这句话把长连接当成了一根建好就一直在、一直通的管子。可它不是。WebSocket 的长连接,是一个有状态的、会无声无息断掉的、需要持续维护的东西。真正的实时推送工程,核心不在"建立连接"和"收发消息"这两个动作上,而在于:用心跳检测连接还活着没、断了要自动重连、断线期间的消息要能补回来、服务端要及时清理死连接、多实例部署时消息要能跨实例送达。这篇文章就把 WebSocket 实时推送梳理一遍:为什么"建个长连接随便互发"用着用着就崩、心跳保活怎么检测死连接、断线重连为什么要指数退避、断线丢的消息怎么补、服务端连接表怎么管,以及多实例广播、连接鉴权、背压这些把长连接真正做对要避开的坑。
问题背景
先把那次实时推送翻车的现象和我的误判讲清楚,后面所有的设计都是冲着纠正这个误判去的。
现象:一个用"建连接 + 互发消息"搭起来的 WebSocket 推送,上线后接连出事:连接被中间代理悄悄掐断,两边都不知情;网络一抖连接断了就永久掉线;重连后断线期间的消息全丢;服务端连接表堆满死连接内存暴涨;多实例部署后消息推不到连在别的实例上的用户。
我当时的错误认知:"WebSocket 就是建个长连接,两边随便互发消息就行。"
真相:WebSocket 长连接是有状态、会无声断开、需要持续维护的。真正的工程不在"建立"和"收发",而在生命周期管理:用心跳检测连接死活、断线自动重连(指数退避)、用消息序号补回断线期间漏掉的消息、服务端及时清理死连接、多实例靠Redis 发布订阅跨实例投递、连接建立后第一帧鉴权。建连接只是开头,把连接管住才是关键。
要把 WebSocket 推送做对,需要几块认知:
- 为什么"建个长连接随便互发"会崩——连接会无声断开;
- 心跳保活——怎么检测一条已经死掉、却没人通知你的连接;
- 断线重连——客户端怎么用指数退避自动重连;
- 消息可靠性——断线期间漏掉的消息怎么补回来;
- 连接管理、多实例广播、鉴权这些工程坑怎么处理。
一、为什么"建个长连接随便互发"会崩
先把这件最根本的事钉死:WebSocket 连接不是一根"建好就永远畅通"的物理管道,它是一个由两端、以及中间一长串代理、网关、防火墙共同维持的"约定";这条链路上任何一个环节,都可能在你毫不知情的情况下把连接掐掉——而 TCP 的特性决定了,连接的另一端可能很久很久都不会发现"对面已经没了"。
下面这段代码,就是我那个"上线就崩"的第一版服务端——它只管"建连接、发消息":
# 反面教材:只管"建连接、发消息",完全不管连接的死活
import asyncio
import websockets
clients = set()
async def handler(ws):
clients.add(ws) # 连上来就记下
try:
async for message in ws:
pass # 这里处理客户端发来的消息
finally:
clients.remove(ws) # 连接关闭时移除
async def push(message: str):
# 破绽:直接往每个连接写,默认它们全都还活着
for ws in clients:
await ws.send(message)
# 破绽 1:没有心跳 —— 连接被中间代理掐断,服务端毫不知情
# 破绽 2:往一个其实已死的连接 send,要么报错要么石沉大海
# 破绽 3:clients 里堆满死连接,内存只涨不降
# 破绽 4:多实例部署时,push 只能推给"连在本实例"的客户端
这段代码没有任何语法错误,在本地跑得无比顺畅。它的问题不在代码本身,而在一个根本性的误解:它默认"一条 WebSocket 连接,只要我没主动关它,它就一直是好的、一直能发消息"。可真实网络根本不是这样——一条长连接空闲几分钟,中间的负载均衡、NAT 网关、企业防火墙就可能把它当成"没用了"而悄悄回收;用户的手机从 wifi 切到 4G,连接瞬间就断。于是四个破绽逐一爆发:破绽 1——没有心跳,连接被掐断后,服务端的 clients 里还留着它,以为它活着。破绽 2——往这个已死的连接 send,消息石沉大海,用户再也收不到。破绽 3——死连接从不被清理,clients 只涨不降,内存泄漏。破绽 4——push 只能遍历本进程的 clients,多实例下天然推不到别的实例。问题的根子清楚了:WebSocket 工程的真正对象,不是"消息",而是"连接"这个会生老病死的东西。
二、心跳保活:揪出那条没人通知你的死连接
纠正第一版,第一件事就是解决破绽 1:你必须有办法知道一条连接到底还活着没。办法就是心跳——服务端定期给客户端发一个 ping,客户端(浏览器会自动)回一个 pong;只要 pong 按时回来,就说明连接还通;长时间收不到任何回应,就判定这条连接已死,主动把它关掉、清理。
import time
# 每个连接记一个"最近一次确认它还活着"的时间戳
last_seen = {}
HEARTBEAT_INTERVAL = 25 # 每 25 秒 ping 一次
HEARTBEAT_TIMEOUT = 60 # 超过 60 秒没任何回应,判定连接已死
async def heartbeat(ws):
"""定期 ping 客户端;长时间收不到回应,就主动关掉这条连接。"""
while True:
await asyncio.sleep(HEARTBEAT_INTERVAL)
# 太久没收到这个连接的任何动静 —— 判定它已经死了
if time.time() - last_seen.get(ws, 0) > HEARTBEAT_TIMEOUT:
await ws.close() # 主动关闭,触发清理逻辑
return
try:
await ws.ping() # 发一个 ping 探活
except Exception:
await ws.close() # ping 都发不出去,连接已断
return
心跳不能只靠服务端一头。客户端也应该主动发一个应用层的心跳——这有两个作用:一是持续给中间的代理"喂活",让那条连接不因为空闲而被回收;二是让服务端能不断刷新 last_seen:
// 客户端:定期发一个应用层心跳,主动告诉服务端"我还活着"
function startHeartbeat(ws) {
const timer = setInterval(() => {
if (ws.readyState === WebSocket.OPEN) {
ws.send(JSON.stringify({ type: "heartbeat" }));
} else {
clearInterval(timer); // 连接已经不在了,停掉这个心跳定时器
}
}, 25000);
return timer;
}
这两段心跳代码,一个在服务端、一个在客户端,合起来才完整。服务端的 heartbeat 是裁判:它靠 last_seen 这个时间戳判断"这条连接还有没有动静"——只要超时,就果断 close,把破绽 1 的"死连接没人发现"彻底解决。这里的两个时间值有讲究:HEARTBEAT_INTERVAL(心跳间隔)要小于中间代理回收空闲连接的时间(常见的代理超时是 60 秒,所以心跳设 25 秒比较稳);HEARTBEAT_TIMEOUT(判死阈值)要大于心跳间隔的两三倍,留出偶尔丢一两个心跳的余地,免得误杀一条只是网络抖了一下的好连接。客户端的 startHeartbeat 则负责不停地制造动静。心跳能发现连接死了,但发现之后呢?连接断了,客户端得能自己接回来。
三、断线重连:用指数退避把连接自己接回来
破绽 2——"网络一抖就永久掉线"——的根子是:第一版的客户端,连接一断就彻底躺平,没有任何重连逻辑。可断线在真实网络里太正常了——切个网、过个隧道、信号弱一下,连接说断就断。所以客户端必须:在 onclose 事件里自动发起重连。但重连不能"断了就立刻重连、连不上就疯狂重试"——那样在服务端宕机或用户彻底断网时,会变成对服务端的一轮自杀式冲击。正确的做法是指数退避:
// 客户端:断线后用"指数退避"自动重连,而不是放弃、也不是疯狂重试
function connect(url, onMessage) {
let retryDelay = 1000; // 首次重连,等 1 秒
const MAX_DELAY = 30000; // 退避上限:最多等 30 秒
function open() {
const ws = new WebSocket(url);
ws.onopen = () => {
retryDelay = 1000; // 连上了,把退避时间重置
};
ws.onmessage = (e) => onMessage(JSON.parse(e.data));
ws.onclose = () => {
// 关键:连接关闭后,等一段时间再重连,且这个时间逐次翻倍
setTimeout(open, retryDelay);
retryDelay = Math.min(retryDelay * 2, MAX_DELAY);
};
}
open();
}
这段 connect 的精髓,全在 retryDelay 这个变量的一涨一落上。涨:每次 onclose 之后,下次重连的等待时间就翻一倍(1 秒、2 秒、4 秒、8 秒……),并用 MAX_DELAY 封顶。这意味着——如果服务端只是抖了一下,第一次等 1 秒就接回来了,用户几乎无感;而如果服务端真的挂了,重连间隔会迅速拉长到 30 秒一次,不会有成千上万的客户端每秒都来捶这个本就脆弱的服务端。落:一旦 onopen 触发——连接成功了——就立刻把 retryDelay 重置回 1 秒。这一步不能忘:否则用户下一次断线,会直接从上次退避到的那个很长的间隔开始等,体验很糟。重连能接回连接了,但这里藏着一个更扎心的问题:连接是接回来了,可断开的那几分钟里,服务端推的消息,去哪了?
四、消息可靠性:把断线期间漏掉的消息补回来
破绽 3——"重连后,断线期间的消息全丢了"——是实时推送里最伤用户信任的问题。原因很直接:客户端断线的那段时间,它根本不在线,服务端往它已死的连接推的消息,全部丢失;等它重连回来,服务端只会推新的,断档期的消息不会重来。要补上这个洞,核心思路是:给每条消息一个全局递增的序号,让客户端能"对账"——发现自己漏了哪些。
import json
# 服务端:每条推送消息,都带一个【全局递增的序号 seq】
_seq = 0
_history = [] # 近期消息留个底,用于断线补偿
def make_message(payload: dict) -> dict:
"""生产一条带 seq 的消息,并存进近期历史。"""
global _seq
_seq += 1
msg = {"seq": _seq, "payload": payload}
_history.append(msg)
if len(_history) > 1000: # 只保留最近 1000 条,避免无限增长
_history.pop(0)
return msg
有了序号和历史留底,补偿就有了着落。客户端重连时,把自己收到的最后一个 seq 报给服务端,服务端一对账,就知道该补发哪些:
async def on_client_resume(ws, last_seq: int):
"""客户端重连时带上它收到的最后一个 seq,服务端补发缺失的消息。"""
# 从历史里挑出所有"比客户端的 last_seq 更新"的消息
missed = [m for m in _history if m["seq"] > last_seq]
for m in missed:
# 把断线期间漏掉的消息,按顺序补发给这个刚重连的客户端
await ws.send(json.dumps(m))
# 客户端据此就能"无缝衔接",不会因为断线丢消息
这两段代码,把"断线必丢消息"这个洞补上了。make_message 干了两件事:给每条消息盖一个递增的 seq,再把它存进 _history。客户端那边,只要记住自己收到的最大 seq,就拥有了一个"对账凭据"。on_client_resume 则是补偿的执行者:客户端重连时报上 last_seq,服务端把 _history 里所有比它新的消息按顺序补发一遍——断档期的消息就回来了。这里有两个工程上的取舍要清醒:其一,_history 不能无限大,所以只留最近 1000 条——这意味着如果用户断线太久、漏掉的消息超出了历史窗口,就补不全了,这种情况通常退化成提示用户"刷新看全部"。其二,补发要保证不重不漏、严格按 seq 顺序,客户端收到后也要按 seq 去重——因为极端情况下一条消息可能既被实时推过、又被补发一次。消息可靠性解决了,接下来回头处理破绽 3 之外那个一直在悄悄漏内存的破绽。
五、连接管理:一张管得住生死的连接表
破绽 3(内存泄漏)的根子,是第一版那个 clients = set() ——它只管 add,几乎不 remove,死连接越积越多。而且它只是一个 set,你没法"找到某个特定用户的连接"——可推送恰恰需要"把这条消息发给用户 X"。所以要把它升级成一张正经的连接表:以 user_id 为键,并且每一次写失败,都立刻意味着"这条连接已死",当即清理。
class ConnectionRegistry:
"""统一管理所有活动连接 —— 能按用户查找,也能识别、清理死连接。"""
def __init__(self):
self._conns = {} # user_id -> websocket
def add(self, user_id: str, ws):
old = self._conns.get(user_id)
if old is not None:
# 同一用户的旧连接先关掉:重复登录、重连残留都靠这清理
asyncio.create_task(old.close())
self._conns[user_id] = ws
def remove(self, user_id: str):
self._conns.pop(user_id, None)
async def send(self, user_id: str, message: str) -> bool:
ws = self._conns.get(user_id)
if ws is None:
return False # 用户当前没有在线连接
try:
await ws.send(message)
return True
except Exception:
# 关键:send 失败 = 这条连接已死,立刻从表里清掉
self.remove(user_id)
return False
这个 ConnectionRegistry 和第一版那个 set 的差距,体现在三个细节上。其一,用 user_id 做键——这样推送才能精确地"发给某个人",而不是无差别广播。其二,add 时主动关掉同一用户的旧连接:用户重连、多开页面、断线残留,都会让一个 user_id 对应出好几条连接——不清理旧的,既漏内存,又会让推送不知道该发哪条。其三,也是最关键的:send 方法把"发送"和"清理"绑在了一起——任何一次 send 抛异常,都立刻判定这条连接已死,当场 remove。这是一种极其有效的被动清理:就算心跳一时没发现某条死连接,只要下一次推送碰到它,它就会被清掉。连接表能管住单机的连接了,但破绽 4 那个多实例的难题,还原封不动。
六、工程坑:多实例广播、连接鉴权与背压
五块设计之外,还有几个工程坑,不处理就会在生产上栽跟头。坑 1:多实例部署,本地连接表就不够用了——要靠一个外部的"广播中枢"。这是破绽 4 的解法。用户 A 连在实例 1,他的消息却可能在实例 2 上产生;实例 2 的 registry 里根本没有 A。解法是:谁也不直接 send,所有消息都先丢进一个 Redis 频道;每个实例都订阅这个频道,各自把消息发给"恰好连在自己身上"的用户。
import redis.asyncio as aioredis
redis = aioredis.from_url("redis://localhost")
registry = ConnectionRegistry()
async def publish_to_user(user_id: str, message: str):
"""发消息不直接 send,而是丢进 Redis 频道 —— 让所有实例都收到。"""
await redis.publish("ws_push", json.dumps(
{"user_id": user_id, "message": message}))
async def subscribe_loop():
"""每个实例都订阅同一个频道;只把消息发给【连在自己身上】的用户。"""
pubsub = redis.pubsub()
await pubsub.subscribe("ws_push")
async for item in pubsub.listen():
if item["type"] != "message":
continue
data = json.loads(item["data"])
# 用户没连在本实例时,registry.send 返回 False,自然被忽略
await registry.send(data["user_id"], data["message"])
坑 2:连接建立后的第一件事,必须是鉴权。WebSocket 的握手不像 HTTP 请求那样每次都带 Cookie 和鉴权头那么直观,很容易裸奔。正确的做法是:连接一建立,就要求客户端先发一帧带 token 的消息,校验通过才登记进连接表,否则立刻关闭。
async def handler(ws):
# 关键:连接建立后的第一件事 —— 鉴权,而不是直接信任并登记
try:
first = await asyncio.wait_for(ws.recv(), timeout=5)
token = json.loads(first).get("token")
user_id = verify_token(token) # 校验失败会抛异常
except Exception:
await ws.close(code=4001) # 鉴权失败,直接关闭
return
registry.add(user_id, ws)
asyncio.create_task(heartbeat(ws)) # 鉴权过了,才启动心跳
try:
async for _ in ws:
last_seen[ws] = time.time() # 收到任何帧都刷新存活时间
finally:
registry.remove(user_id) # 连接结束,务必清理
坑 3:小心背压(backpressure)。如果服务端推消息的速度,超过了某个客户端消费的速度(比如对方网络很慢),消息会在服务端的发送缓冲区里越堆越多,吃光内存。要给单个连接的发送队列设上限,堆积超限时该限流限流、该丢弃旧消息丢弃旧消息。坑 4:区分协议层心跳和应用层心跳。WebSocket 协议自带 ping/pong 帧,但有些中间代理不把它算作"活动流量";所以稳妥的做法是再叠一层应用层的心跳消息(就像第二节客户端那样),双保险。坑 5:能用 HTTP 解决的,别硬上 WebSocket。WebSocket 的全部复杂度都来自"长连接要一直维护"。如果你的场景只是偶尔查个状态、实时性要求不高,一个简单的轮询或 SSE(服务端推送事件)可能就够了,没必要背上长连接管理的全部包袱。下面这张图,把一条连接从建立到重连的完整生命周期串起来:
关键概念速查
| 概念 / 手段 | 说明 |
|---|---|
| WebSocket 长连接 | 一条有状态会断开的全双工连接,不是建好就一劳永逸 |
| 心跳保活 | 定期 ping pong 探活,检测被中间代理悄悄掐断的死连接 |
| 死连接 | 物理已断但服务端不知情的连接,会堆积内存还收不到消息 |
| 断线重连 | 客户端 onclose 后自动重连,别让用户因网络抖动永久掉线 |
| 指数退避 | 重连间隔逐次翻倍并设上限,断网时不会高频冲击服务端 |
| 消息序号 | 每条推送带全局递增 seq,客户端据此对账发现自己漏了消息 |
| 断线补偿 | 重连时带上最后收到的 seq,服务端补发断线期间漏掉的消息 |
| 连接表 | 服务端用 user_id 到连接的映射管理在线连接,写失败即时清理 |
| 多实例广播 | 多机部署用 Redis 发布订阅,让消息能跨实例送到目标用户 |
| 连接鉴权 | 连接建立后第一帧做鉴权,失败立刻关闭,不裸奔接收消息 |
避坑清单
- WebSocket 连接会被中间代理悄悄掐断,必须做心跳保活,别假设连接一直在。
- 心跳要双向,服务端定期 ping,长时间收不到回应就主动关掉死连接。
- 客户端必须实现断线自动重连,onclose 后重新建连,别让用户永久掉线。
- 重连要用指数退避,间隔逐次翻倍设上限,避免断网时高频冲击服务端。
- 断线期间会丢消息,每条推送带递增序号,重连时带 last seq 让服务端补发。
- 服务端要及时清理死连接,往连接 send 失败就立刻从连接表移除。
- 连接表别只增不减,同一用户重复连接要先关掉旧连接再登记新的。
- 多实例部署时本地连接表不够用,要靠 Redis 发布订阅跨实例投递消息。
- 连接建立后第一件事是鉴权,鉴权失败立刻关闭,别裸奔接收消息。
- 注意背压,客户端消费慢时发送队列会堆积,要限流或丢弃旧消息。
总结
回头看那次"用户开着页面挂一会儿就再也收不到消息、一断线就永久掉线"的事故,以及我后来在 WebSocket 上接连踩的坑,最该记住的不是某一段心跳或重连代码,而是我动手前那个想当然的判断——"WebSocket 就是建个长连接,两边随便互发消息"。这句话错在它把长连接想象成了一根接通就一直畅通的水管。我以为实时推送的难点是"怎么把消息又快又准地发出去",而连接不过是发消息之前那个一次性的、建好就不用管的前提。可事实恰恰相反:消息怎么发是简单的,真正难的、需要持续投入工程量的,是"维护连接"本身。一条 WebSocket 连接,从它建立的那一刻起,就一直处在"可能正在悄悄死去"的状态——它会被代理掐、被网络断、被防火墙回收。实时推送这件事想清楚的,正是这个:它表面上是个"发消息"的功能,本质上是一套"连接生命周期的管理系统"——你管理的不是消息,是成千上万条随时可能生老病死的连接。
所以做 WebSocket 推送,真正的工程量不在"new WebSocket()"和"ws.send()"这两行上。那两行,任何教程的第一页就教完了。真正的工程量,在于你要为"连接会无声无息地断"这个事实,处理掉它引发的所有连锁后果:连接会悄悄死,你就得用心跳去持续探活;连接断了,你就得让客户端用指数退避自动接回来;断线必然漏消息,你就得用序号和历史留底把漏掉的补回来;死连接会堆积,你就得有一张能识别、能自清理的连接表;一旦多实例,你就得用一个外部广播中枢让消息跨实例送达。这篇文章的几节,其实就是顺着这条思路展开的:先想清楚"建个长连接随便互发"为什么会崩,再用心跳揪出死连接,用指数退避把断掉的连接接回来,用消息序号补回断线期间的漏单,用一张正经的连接表管住连接的生死,最后是多实例广播、连接鉴权、背压这几个把实时推送做扎实的工程细节。
你会发现,WebSocket 的思路,和现实里怎么维持一通长时间的电话完全相通。你给朋友打了个电话,这通电话接通的那一刻,只是开始,不是万事大吉。如果两个人都长时间不说话,你没法确定对面是还在听、还是早就走开了——所以你会时不时"喂?还在吗?"地确认一下(这就是心跳)。电话突然断了,一个在乎这通电话的人,会马上回拨;要是一直拨不通,他会过一会儿再试,而不是每秒狂拨(这就是指数退避重连)。重新接通后,他会问一句"刚才断的时候你说到哪了?",好把错过的内容补上(这就是断线消息补偿)。而一个电话总机,必须时刻清楚每一条线路接的是谁、哪条线已经空了,才能把找某人的电话准确转过去(这就是连接表)。一通电话的质量,从来不取决于你拨号那一下有多顺,而取决于接通之后,你有没有一直在确认"对面那个人,还在不在线上"。
最后想说,WebSocket 推送做没做扎实,差距永远不会在本地测试时暴露——本地的网络又快又稳,连接建好就不会断,你写完建连接、发消息这几行,会觉得"WebSocket 不就这么回事"。它只在真实的、用户会切网、会锁屏、会挂着页面去开会、中间隔着一堆你看不见的代理和防火墙的生产环境里才显形。那时候它会用最难堪的方式给你结账:做不好,你会像我一样,看着用户抱怨"消息时灵时不灵",看着服务端的连接数只涨不跌、内存一路报警,看着加了第二台机器后一半用户的推送凭空消失;而做对了,无论用户的网络怎么抖、怎么切,你的推送都能稳稳地工作:连接断了能自己接回来,断线漏掉的消息能补上,死连接被及时清走,消息无论用户连在哪台机器都能送到。所以别等用户抱怨"收不到消息"才去补心跳和重连,在你决定"用 WebSocket 做实时推送"的那一刻就该想清楚:这条连接断了我怎么知道?断了怎么接回来?断的时候漏的消息怎么办?这几个问题都有了答案,你的 WebSocket 才不只是一个"本地看起来能实时收消息"的 demo,而是一套能在真实糟糕网络里稳稳把消息送到用户眼前的可靠系统。
—— 别看了 · 2026