观察者模式是最常被用、也最常被误用的设计模式。前端的事件监听、后端的消息总线、数据库的 trigger、Spring 的 ApplicationEvent、Vue/React 的响应式系统,本质都是观察者。这篇文章把它从概念讲到实现,再到三个高级变体(发布订阅、响应式、RxJS),让你以后看到任何"X 变了通知 Y"的需求,都能直接判断该用哪种。
问题的本质:解耦"事件源"和"处理者"
想象一个简单需求:用户下单成功后,要发邮件、发短信、加积分、更新统计。最朴素的写法:
function placeOrder(order) {
saveToDB(order);
sendEmail(order.user); // 紧耦合
sendSMS(order.user); // 紧耦合
addPoints(order.user, order.total);
updateStats(order);
// 以后再加一个推送 IM 通知,还要回来改这个函数
}
问题:placeOrder 知道太多东西。新加一个通知方式,就要改它一次,违反开闭原则(对扩展开放,对修改关闭)。观察者模式给你一个出口:让 placeOrder 只做"通知有事发生了",其他模块"自己来关心"。
最简单的观察者:Subject + Observer
// 经典 GoF 风格
class Subject {
constructor() { this.observers = []; }
subscribe(obs) { this.observers.push(obs); }
unsubscribe(obs) {
this.observers = this.observers.filter((o) => o !== obs);
}
notify(data) {
this.observers.forEach((obs) => obs.update(data));
}
}
class EmailObserver {
update(order) { console.log(`发邮件给 ${order.user}`); }
}
class SMSObserver {
update(order) { console.log(`发短信给 ${order.user}`); }
}
const orderSubject = new Subject();
orderSubject.subscribe(new EmailObserver());
orderSubject.subscribe(new SMSObserver());
function placeOrder(order) {
saveToDB(order);
orderSubject.notify(order); // 一行搞定所有后续动作
}
对比前面紧耦合的版本,优点立刻显现:
- 新增观察者只需要
subscribe,不动placeOrder。 - 临时关闭某个通知只需要
unsubscribe,或者干脆把那个 observer 从注册列表移除。 - 测试时可以 mock 整个 Subject,验证 notify 被调用了几次、参数是什么。
EventEmitter:更工程化的观察者
GoF 风格的 Subject 一次只能发一种事件。真实业务里一个对象通常有多种事件 —— 订单有 created / paid / shipped / cancelled 多个状态变化。这时用 EventEmitter:
class EventEmitter {
constructor() { this.handlers = {}; }
on(event, fn) {
(this.handlers[event] ??= []).push(fn);
return () => this.off(event, fn); // 返回取消订阅函数
}
once(event, fn) {
const wrap = (...args) => { fn(...args); this.off(event, wrap); };
return this.on(event, wrap);
}
off(event, fn) {
if (!this.handlers[event]) return;
this.handlers[event] = this.handlers[event].filter((h) => h !== fn);
}
emit(event, ...args) {
const list = this.handlers[event];
if (!list) return;
// 拷贝一份再遍历:防止 handler 里 off 导致迭代异常
[...list].forEach((h) => {
try { h(...args); } catch (e) { console.error(e); }
});
}
}
const order = new EventEmitter();
order.on('paid', (o) => sendReceipt(o));
order.on('paid', (o) => addPoints(o.user, o.total));
order.on('cancelled', refund);
order.emit('paid', { user: 'mores', total: 100 });
Node.js 内置的 events.EventEmitter、浏览器的 EventTarget、Vue 2 的 $on/$emit,都是同一思路的不同实现。
同步还是异步
EventEmitter 默认是同步的 —— emit 会顺序调用所有 handler,任何一个 handler 抛异常会影响后面的(除非像上面那样 try/catch)。如果 handler 里有耗时操作,会拖慢 emit 调用方。两种改造方式:
// 1. 把每个 handler 放到下一 tick
emit(event, ...args) {
[...(this.handlers[event] || [])].forEach((h) => {
queueMicrotask(() => h(...args));
});
}
// 2. 返回 Promise,可 await
async emitAsync(event, ...args) {
const list = this.handlers[event] || [];
for (const h of list) {
await h(...args); // 顺序异步,所有 handler 完成才返回
}
}
发布订阅模式:多了一层"中介"
观察者模式里 Subject 直接持有 Observer 列表。发布订阅(Pub/Sub)更进一步 —— 引入一个中介(broker / event bus),Publisher 把消息发给 broker,Subscriber 也是从 broker 拿消息。Publisher 不知道有哪些 Subscriber,Subscriber 也不知道是谁发的。
// 一个简单的进程内 EventBus
class EventBus {
constructor() { this.topics = {}; }
subscribe(topic, fn) {
(this.topics[topic] ??= []).push(fn);
}
publish(topic, data) {
(this.topics[topic] || []).forEach((fn) => fn(data));
}
}
const bus = new EventBus();
// A 模块:订阅
bus.subscribe('order.paid', (o) => sendEmail(o));
bus.subscribe('order.paid', (o) => addPoints(o));
// B 模块:发布(完全不知道 A 的存在)
bus.publish('order.paid', { user: 'mores' });
这种解耦在大型前端应用里特别有用 —— 但要小心:全局事件总线很容易演变成"看不见的依赖",出 bug 不知道是谁触发了谁。我的经验:EventBus 只用于真正跨模块、低频、可丢失的通知。频繁、强逻辑的通信走显式的函数调用或状态管理。
响应式编程:观察者模式的极致
Vue 和 MobX 的响应式系统,本质是观察者模式的"自动化"版本 —— 你不再手动调用 subscribe / emit,而是声明"这个值依赖那个值",框架自动追踪依赖、自动通知更新。
// 一个极简的响应式系统(70 行)
let activeEffect = null;
function effect(fn) {
activeEffect = fn;
fn(); // 跑一次,触发依赖收集
activeEffect = null;
}
function reactive(target) {
const deps = new Map(); // 每个属性的订阅者列表
return new Proxy(target, {
get(obj, key) {
if (activeEffect) {
if (!deps.has(key)) deps.set(key, new Set());
deps.get(key).add(activeEffect); // 收集
}
return Reflect.get(obj, key);
},
set(obj, key, value) {
const r = Reflect.set(obj, key, value);
const list = deps.get(key);
if (list) list.forEach((fn) => fn()); // 通知
return r;
},
});
}
// 使用:
const state = reactive({ count: 0, name: 'mores' });
effect(() => {
console.log(`count = ${state.count}`); // 这里读 count,自动订阅 count
});
state.count = 1; // 自动重新执行上面的 effect,打印 count = 1
state.count = 2; // 再次打印 count = 2
state.name = 'x'; // effect 没读 name,不重新执行
这就是 Vue 3 响应式 + Effect 的核心机制(真实实现更复杂,处理嵌套对象、数组、批量更新、深度依赖等)。理解了它,Vue 的 computed 和 watch、MobX 的 observable、Solid 的 Signal,你都能看懂。
RxJS:观察者模式 + Iterator + 数据流
RxJS 把观察者模式推到极致:把"一系列异步事件"看作"流",所有变换、过滤、合并都是流操作。前面写的 EventEmitter 类似 Subject,而 RxJS 加了一整套操作符。
import { fromEvent, map, filter, debounceTime, switchMap } from 'rxjs';
// 把搜索框的输入做成一个数据流
const input$ = fromEvent(document.querySelector('#search'), 'input');
input$
.pipe(
map((e) => e.target.value),
filter((q) => q.length >= 2),
debounceTime(300), // 防抖
switchMap((q) => fetch(`/api/search?q=${q}`).then((r) => r.json())),
// switchMap 自动取消上一个未完成的请求
)
.subscribe((results) => render(results));
同样的需求用 EventEmitter 写,你需要手动管定时器、手动取消上一个 fetch、手动判断长度;用 RxJS,这些都是声明式的链式操作符。当业务里"复杂异步流"很多时,RxJS 的复用性立刻显现。代价是学习曲线 —— 几十个操作符要记一段时间。
常见坑
坑 1:忘记取消订阅,内存泄漏。 组件卸载时如果没 off,handler 会一直被 Subject 引用,handler 又通常闭包引用了组件实例,整个组件链路无法被 GC。React 里这就是为什么 useEffect 一定要返回清理函数。
useEffect(() => {
const handler = (data) => setX(data);
bus.on('event', handler);
return () => bus.off('event', handler); // 必须!
}, []);
坑 2:在 handler 里又订阅或退订,改了正在迭代的数组。 这就是为什么前面 emit 实现里要 [...list].forEach 先拷一份。
坑 3:链式触发导致死循环。 A 事件触发 B handler,B handler 又 emit A 事件,瞬间栈溢出。开发时打开 EventEmitter 的 max listeners 警告,生产时考虑加防护(同一事件最大递归深度)。
坑 4:订阅顺序敏感的业务逻辑写进 handler。 观察者本质是无序的(注册顺序不应该被依赖)。如果某些处理必须按顺序,要么用专门的中间件链(类似 Express 的 middleware),要么让一个 handler 里完成整段流程。
跨进程的观察者:消息队列
把观察者模式从进程内放大到分布式,就是消息队列(MQ)。Publisher 发消息到 broker,Subscriber 订阅 topic 拿消息 —— 心智模型完全一样,只是 broker 跑在另一台机器上。
# RabbitMQ 风格(AMQP)
publisher = new Channel().exchangeDeclare('orders', 'topic')
publisher.publish('orders', 'order.paid', orderJson)
subscriber.queueBind(myQueue, 'orders', 'order.*')
subscriber.consume(myQueue, msg => processOrder(JSON.parse(msg)))
# Kafka 风格
producer.send({ topic: 'orders', key: orderId, value: orderJson })
consumer.subscribe(['orders'])
consumer.eachMessage(({ topic, partition, message }) => { ... })
分布式观察者带来一组进程内观察者没有的工程问题:消息丢失(broker 宕机)、消息重复(consumer 处理完没 ack 又重发)、消息顺序(多 partition 时同一 key 必须落到同一 partition 才有序)、背压(订阅者处理不过来)。这些都是消息队列要解决的核心问题,但思想根上还是那套观察者:谁不知道谁。
实战:用响应式重写一个表单验证
把响应式思想用到具体 UI 场景。下面是一个表单,要求实时校验:邮箱合法 + 密码强 + 两次密码一致,所有满足才能提交。
// 用前面那 70 行的极简响应式系统
const form = reactive({ email: '', password: '', confirm: '' });
const errors = reactive({});
effect(() => {
errors.email = /^.+@.+\..+$/.test(form.email) ? '' : '邮箱格式错';
});
effect(() => {
errors.password = form.password.length >= 8 ? '' : '密码至少 8 位';
});
effect(() => {
errors.confirm = form.password === form.confirm ? '' : '两次密码不一致';
});
effect(() => {
const canSubmit = !errors.email && !errors.password && !errors.confirm
&& form.email && form.password && form.confirm;
submitBtn.disabled = !canSubmit;
});
四个独立的 effect,每个只关心自己依赖的字段。改了 email,只有第一个和最后一个 effect 重跑。这比写"on input,跑一遍所有校验"那种命令式代码清晰得多 —— 而且状态和派生状态的关系是声明出来的,新加一个校验规则只是新加一个 effect,不动其他代码。
写在最后
观察者模式的精髓不是模板代码,而是"谁知道谁"这个问题的答案:Subject 不知道 Observer 是谁,只知道有人来订阅了它的事件;Observer 不需要知道事件源的内部,只需要响应它感兴趣的事件。这种"反向依赖"是解耦的关键 —— 想加新功能,新增订阅者;想暂停某个功能,取消订阅。源头始终不动。
选型一句话:同一对象内、状态变化触发更新,用观察者/响应式;跨模块、跨进程的事件通知,用发布订阅;复杂异步事件流的变换,用 RxJS。把这三个层次分开,你的代码就既灵活又不会乱。
—— 别看了 · 2026