用 Go 实现一个轻量级事件总线,解耦智能工作流
做智能工作流系统时,最麻烦的是业务逻辑太散。如果每个节点(比如大模型调用、数据清洗、发通知)都直接硬编码调用,代码很快就会纠缠在一起。与其在核心业务里写满if-else和同步调用,不如在中间加一层事件总线。让节点通过发布 - 订阅模式异步通信,后续加新功能时,不用动旧代码。
一、为什么同步调用会卡住系统
传统写法通常是一个主函数按顺序调完所有模块。比如工单处理完了,顺便调个接口把摘要发到企业微信。一旦要加新功能,就得打开核心文件改代码。
这种写法有两个硬伤:
- 耦合太重:违背了开闭原则,改一个地方可能影响全局。
- 阻塞风险:下游通知模块如果网络超时,会直接拖慢主业务的响应速度。
对于长周期的 AI 任务,通常需要支持异步回调和插件热插拔。如果没有一个抽象的事件中枢,代码很容易变成难以维护的“意大利面”。
二、事件驱动的工作流逻辑
解耦的核心思路是:节点执行完后,把结果扔进事件总线,不用管谁在听。
架构逻辑大致如下:
- 发布者(如工单节点)发布
order.created事件。 - 事件总线收到后,分发给所有订阅该主题的节点。
- 订阅者(如分类器、存储库、通知服务)处理完后,如果需要,可以继续发布新事件(如
ticket.processed)。 - 插件管理器可以动态注册或注销订阅者,无需重启服务。
新增业务节点时,只需要向总线注册订阅,完全不用修改发布者的代码。
三、Go 语言实现:轻量级并发事件总线
为了保持单机高性能,这里用 Go 标准库实现了一个简单的并发安全事件总线。它支持基于 Topic 的发布订阅、带缓冲的 Channel、以及安全的动态注销。没有引入 RabbitMQ 或 Kafka 等外部中间件,适合单机场景。
package bus import ( "context" "errors" "sync" ) // Event 结构体定义了传递的数据实体 type Event struct { Topic string Data interface{} } // EventChannel 是订阅者接收事件的通道 type EventChannel chan Event // EventBus 管理所有的主题订阅关系 type EventBus struct { subscribers map[string][]EventChannel mu sync.RWMutex } // NewEventBus 初始化事件总线 func NewEventBus() *EventBus { return &EventBus{ subscribers: make(map[string][]EventChannel), } } // Subscribe 注册对某个 Topic 的订阅,返回一个接收通道 func (eb *EventBus) Subscribe(topic string, bufferSize int) EventChannel { eb.mu.Lock() defer eb.mu.Unlock() ch := make(EventChannel, bufferSize) eb.subscribers[topic] = append(eb.subscribers[topic], ch) return ch } // Unsubscribe 注销订阅,安全关闭通道并释放资源 func (eb *EventBus) Unsubscribe(topic string, ch EventChannel) error { eb.mu.Lock() defer eb.mu.Unlock() subs, ok := eb.subscribers[topic] if !ok { return errors.New("topic not found") } for i, sub := range subs { if sub == ch { // 安全关闭通道防止向已关闭通道发送导致的 panic close(ch) // 从切片中移除该通道 eb.subscribers[topic] = append(subs[:i], subs[i+1:]...) return nil } } return errors.New("subscriber channel not found in this topic") } // Publish 异步广播事件,支持 context 超时控制防止下游阻塞拖垮总线 func (eb *EventBus) Publish(ctx context.Context, event Event) { eb.mu.RLock() subs, ok := eb.subscribers[event.Topic] eb.mu.RUnlock() if !ok { return // 当前没有订阅者 } var wg sync.WaitGroup for _, sub := range subs { wg.Add(1) go func(ch EventChannel) { defer wg.Done() select { case <-ctx.Done(): // 上下文超时或被撤销,放弃发送防止 goroutine 挂起泄漏 return case ch <- event: // 事件成功写入缓冲通道 } }(sub) } // 等待本次广播的所有并发分发 goroutine 结束 wg.Wait() }四、实际落地时的几个坑
直接用事件总线虽然方便,但有几个问题需要自己处理:
- 顺序问题:并发处理时,后发布的事件可能先执行完。如果业务强依赖顺序(比如先扣款后发货),需要在 Event 里加序列号,让订阅者自己排序。
- 背压(Backpressure):如果某个订阅者(比如调大模型 API)处理太慢,它的 Channel 缓冲区会满。这时候需要设置合理的 buffer 大小,或者在 Publish 时加超时控制,直接丢弃或重试,别让慢节点拖死整个系统。
- 消息丢失:被丢弃或消费失败的事件不能直接消失。建议做个死信队列(Dead Letter Queue),把失败的消息存起来,方便后续人工排查或重放。
五、小结
这套方案的核心就是“解耦”。在智能工作流底层加一层并发安全的发布 - 订阅机制,配合超时控制,能大幅降低代码的维护成本。后续加新功能时,只需要注册新订阅者,不用动老代码,系统扩展起来也顺手。