观察者模式的四种现代演化:从 EventEmitter 到响应式与 RxJS

观察者模式是最常被用、也最常被误用的设计模式。前端的事件监听、后端的消息总线、数据库的 trigger、Spring 的 ApplicationEvent、Vue/React 的响应式系统,本质都是观察者。这篇文章把它从概念讲到实现,再到三个高级变体(发布订阅、响应式、RxJS),让你以后看到任何"X 变了通知 Y"的需求,都能直接判断该用哪种。

问题的本质:解耦"事件源"和"处理者"

想象一个简单需求:用户下单成功后,要发邮件、发短信、加积分、更新统计。最朴素的写法:

function placeOrder(order) {
    saveToDB(order);
    sendEmail(order.user);          // 紧耦合
    sendSMS(order.user);            // 紧耦合
    addPoints(order.user, order.total);
    updateStats(order);
    // 以后再加一个推送 IM 通知,还要回来改这个函数
}

问题:placeOrder 知道太多东西。新加一个通知方式,就要改它一次,违反开闭原则(对扩展开放,对修改关闭)。观察者模式给你一个出口:让 placeOrder 只做"通知有事发生了",其他模块"自己来关心"。

最简单的观察者:Subject + Observer

// 经典 GoF 风格
class Subject {
    constructor() { this.observers = []; }
    subscribe(obs) { this.observers.push(obs); }
    unsubscribe(obs) {
        this.observers = this.observers.filter((o) => o !== obs);
    }
    notify(data) {
        this.observers.forEach((obs) => obs.update(data));
    }
}

class EmailObserver {
    update(order) { console.log(`发邮件给 ${order.user}`); }
}
class SMSObserver {
    update(order) { console.log(`发短信给 ${order.user}`); }
}

const orderSubject = new Subject();
orderSubject.subscribe(new EmailObserver());
orderSubject.subscribe(new SMSObserver());

function placeOrder(order) {
    saveToDB(order);
    orderSubject.notify(order);     // 一行搞定所有后续动作
}

对比前面紧耦合的版本,优点立刻显现:

  • 新增观察者只需要 subscribe,不动 placeOrder
  • 临时关闭某个通知只需要 unsubscribe,或者干脆把那个 observer 从注册列表移除。
  • 测试时可以 mock 整个 Subject,验证 notify 被调用了几次、参数是什么。

EventEmitter:更工程化的观察者

GoF 风格的 Subject 一次只能发一种事件。真实业务里一个对象通常有多种事件 —— 订单有 created / paid / shipped / cancelled 多个状态变化。这时用 EventEmitter:

class EventEmitter {
    constructor() { this.handlers = {}; }

    on(event, fn) {
        (this.handlers[event] ??= []).push(fn);
        return () => this.off(event, fn);   // 返回取消订阅函数
    }
    once(event, fn) {
        const wrap = (...args) => { fn(...args); this.off(event, wrap); };
        return this.on(event, wrap);
    }
    off(event, fn) {
        if (!this.handlers[event]) return;
        this.handlers[event] = this.handlers[event].filter((h) => h !== fn);
    }
    emit(event, ...args) {
        const list = this.handlers[event];
        if (!list) return;
        // 拷贝一份再遍历:防止 handler 里 off 导致迭代异常
        [...list].forEach((h) => {
            try { h(...args); } catch (e) { console.error(e); }
        });
    }
}

const order = new EventEmitter();
order.on('paid', (o) => sendReceipt(o));
order.on('paid', (o) => addPoints(o.user, o.total));
order.on('cancelled', refund);

order.emit('paid', { user: 'mores', total: 100 });

Node.js 内置的 events.EventEmitter、浏览器的 EventTarget、Vue 2 的 $on/$emit,都是同一思路的不同实现。

同步还是异步

EventEmitter 默认是同步的 —— emit 会顺序调用所有 handler,任何一个 handler 抛异常会影响后面的(除非像上面那样 try/catch)。如果 handler 里有耗时操作,会拖慢 emit 调用方。两种改造方式:

// 1. 把每个 handler 放到下一 tick
emit(event, ...args) {
    [...(this.handlers[event] || [])].forEach((h) => {
        queueMicrotask(() => h(...args));
    });
}

// 2. 返回 Promise,可 await
async emitAsync(event, ...args) {
    const list = this.handlers[event] || [];
    for (const h of list) {
        await h(...args);    // 顺序异步,所有 handler 完成才返回
    }
}

发布订阅模式:多了一层"中介"

观察者模式里 Subject 直接持有 Observer 列表。发布订阅(Pub/Sub)更进一步 —— 引入一个中介(broker / event bus),Publisher 把消息发给 broker,Subscriber 也是从 broker 拿消息。Publisher 不知道有哪些 Subscriber,Subscriber 也不知道是谁发的。

// 一个简单的进程内 EventBus
class EventBus {
    constructor() { this.topics = {}; }
    subscribe(topic, fn) {
        (this.topics[topic] ??= []).push(fn);
    }
    publish(topic, data) {
        (this.topics[topic] || []).forEach((fn) => fn(data));
    }
}

const bus = new EventBus();
// A 模块:订阅
bus.subscribe('order.paid', (o) => sendEmail(o));
bus.subscribe('order.paid', (o) => addPoints(o));
// B 模块:发布(完全不知道 A 的存在)
bus.publish('order.paid', { user: 'mores' });

这种解耦在大型前端应用里特别有用 —— 但要小心:全局事件总线很容易演变成"看不见的依赖",出 bug 不知道是谁触发了谁。我的经验:EventBus 只用于真正跨模块、低频、可丢失的通知。频繁、强逻辑的通信走显式的函数调用或状态管理。

响应式编程:观察者模式的极致

Vue 和 MobX 的响应式系统,本质是观察者模式的"自动化"版本 —— 你不再手动调用 subscribe / emit,而是声明"这个值依赖那个值",框架自动追踪依赖、自动通知更新。

// 一个极简的响应式系统(70 行)
let activeEffect = null;

function effect(fn) {
    activeEffect = fn;
    fn();                       // 跑一次,触发依赖收集
    activeEffect = null;
}

function reactive(target) {
    const deps = new Map();     // 每个属性的订阅者列表
    return new Proxy(target, {
        get(obj, key) {
            if (activeEffect) {
                if (!deps.has(key)) deps.set(key, new Set());
                deps.get(key).add(activeEffect);   // 收集
            }
            return Reflect.get(obj, key);
        },
        set(obj, key, value) {
            const r = Reflect.set(obj, key, value);
            const list = deps.get(key);
            if (list) list.forEach((fn) => fn());  // 通知
            return r;
        },
    });
}

// 使用:
const state = reactive({ count: 0, name: 'mores' });

effect(() => {
    console.log(`count = ${state.count}`);   // 这里读 count,自动订阅 count
});

state.count = 1;   // 自动重新执行上面的 effect,打印 count = 1
state.count = 2;   // 再次打印 count = 2
state.name = 'x';  // effect 没读 name,不重新执行

这就是 Vue 3 响应式 + Effect 的核心机制(真实实现更复杂,处理嵌套对象、数组、批量更新、深度依赖等)。理解了它,Vue 的 computedwatch、MobX 的 observable、Solid 的 Signal,你都能看懂。

RxJS:观察者模式 + Iterator + 数据流

RxJS 把观察者模式推到极致:把"一系列异步事件"看作"流",所有变换、过滤、合并都是流操作。前面写的 EventEmitter 类似 Subject,而 RxJS 加了一整套操作符

import { fromEvent, map, filter, debounceTime, switchMap } from 'rxjs';

// 把搜索框的输入做成一个数据流
const input$ = fromEvent(document.querySelector('#search'), 'input');

input$
    .pipe(
        map((e) => e.target.value),
        filter((q) => q.length >= 2),
        debounceTime(300),             // 防抖
        switchMap((q) => fetch(`/api/search?q=${q}`).then((r) => r.json())),
        // switchMap 自动取消上一个未完成的请求
    )
    .subscribe((results) => render(results));

同样的需求用 EventEmitter 写,你需要手动管定时器、手动取消上一个 fetch、手动判断长度;用 RxJS,这些都是声明式的链式操作符。当业务里"复杂异步流"很多时,RxJS 的复用性立刻显现。代价是学习曲线 —— 几十个操作符要记一段时间。

常见坑

坑 1:忘记取消订阅,内存泄漏。 组件卸载时如果没 off,handler 会一直被 Subject 引用,handler 又通常闭包引用了组件实例,整个组件链路无法被 GC。React 里这就是为什么 useEffect 一定要返回清理函数。

useEffect(() => {
    const handler = (data) => setX(data);
    bus.on('event', handler);
    return () => bus.off('event', handler);   // 必须!
}, []);

坑 2:在 handler 里又订阅或退订,改了正在迭代的数组。 这就是为什么前面 emit 实现里要 [...list].forEach 先拷一份。

坑 3:链式触发导致死循环。 A 事件触发 B handler,B handler 又 emit A 事件,瞬间栈溢出。开发时打开 EventEmitter 的 max listeners 警告,生产时考虑加防护(同一事件最大递归深度)。

坑 4:订阅顺序敏感的业务逻辑写进 handler。 观察者本质是无序的(注册顺序不应该被依赖)。如果某些处理必须按顺序,要么用专门的中间件链(类似 Express 的 middleware),要么让一个 handler 里完成整段流程。

跨进程的观察者:消息队列

把观察者模式从进程内放大到分布式,就是消息队列(MQ)。Publisher 发消息到 broker,Subscriber 订阅 topic 拿消息 —— 心智模型完全一样,只是 broker 跑在另一台机器上。

# RabbitMQ 风格(AMQP)
publisher = new Channel().exchangeDeclare('orders', 'topic')
publisher.publish('orders', 'order.paid', orderJson)

subscriber.queueBind(myQueue, 'orders', 'order.*')
subscriber.consume(myQueue, msg => processOrder(JSON.parse(msg)))

# Kafka 风格
producer.send({ topic: 'orders', key: orderId, value: orderJson })

consumer.subscribe(['orders'])
consumer.eachMessage(({ topic, partition, message }) => { ... })

分布式观察者带来一组进程内观察者没有的工程问题:消息丢失(broker 宕机)、消息重复(consumer 处理完没 ack 又重发)、消息顺序(多 partition 时同一 key 必须落到同一 partition 才有序)、背压(订阅者处理不过来)。这些都是消息队列要解决的核心问题,但思想根上还是那套观察者:谁不知道谁。

实战:用响应式重写一个表单验证

把响应式思想用到具体 UI 场景。下面是一个表单,要求实时校验:邮箱合法 + 密码强 + 两次密码一致,所有满足才能提交。

// 用前面那 70 行的极简响应式系统
const form = reactive({ email: '', password: '', confirm: '' });
const errors = reactive({});

effect(() => {
    errors.email = /^.+@.+\..+$/.test(form.email) ? '' : '邮箱格式错';
});
effect(() => {
    errors.password = form.password.length >= 8 ? '' : '密码至少 8 位';
});
effect(() => {
    errors.confirm = form.password === form.confirm ? '' : '两次密码不一致';
});

effect(() => {
    const canSubmit = !errors.email && !errors.password && !errors.confirm
        && form.email && form.password && form.confirm;
    submitBtn.disabled = !canSubmit;
});

四个独立的 effect,每个只关心自己依赖的字段。改了 email,只有第一个和最后一个 effect 重跑。这比写"on input,跑一遍所有校验"那种命令式代码清晰得多 —— 而且状态和派生状态的关系是声明出来的,新加一个校验规则只是新加一个 effect,不动其他代码。

写在最后

观察者模式的精髓不是模板代码,而是"谁知道谁"这个问题的答案:Subject 不知道 Observer 是谁,只知道有人来订阅了它的事件;Observer 不需要知道事件源的内部,只需要响应它感兴趣的事件。这种"反向依赖"是解耦的关键 —— 想加新功能,新增订阅者;想暂停某个功能,取消订阅。源头始终不动。

选型一句话:同一对象内、状态变化触发更新,用观察者/响应式;跨模块、跨进程的事件通知,用发布订阅;复杂异步事件流的变换,用 RxJS。把这三个层次分开,你的代码就既灵活又不会乱。

—— 别看了 · 2026
声明:本站所有文章,如无特殊说明或标注,均为本站原创发布。任何个人或组织,在未征得本站同意时,禁止复制、盗用、采集、发布本站内容到任何网站、书籍等各类媒体平台。如若本站内容侵犯了原著者的合法权益,可联系我们进行处理 邮箱1846861578@qq.com。
技术教程

红黑树完全指南:从五条规则到插入删除修复的图解与代码

2026-5-15 11:12:40

技术教程

WebSocket 实时通信完全指南:从协议握手到生产部署

2026-5-15 11:12:40

0 条回复 A文章作者 M管理员
    暂无讨论,说说你的看法吧
个人中心
购物车
优惠劵
今日签到
有新私信 私信列表
搜索