当前位置: 首页 > news >正文

【Redis从入门到精通】第54篇:发布订阅实战——实时消息推送、聊天室、事件通知

上一篇【第53篇】Redis发布订阅——消息队列的轻量替代方案
下一篇【第55篇】Redis事务——MULTI/EXEC/DISCARD/WATCH详解


上一篇我们搞懂了Redis Pub/Sub的内部原理,今天轮到真刀真枪了。我们会用实际代码实现几个经典场景,再跟Kafka、RabbitMQ来一场"正面PK",最后说说在Cluster模式下用Pub/Sub需要注意的那些坑。


实战一:聊天室

最经典的Pub/Sub应用场景就是聊天室。频道就是聊天室,订阅就是加入聊天室,发布就是发消息。

Python 实现

importredisimportthreadingimporttime r=redis.Redis(host='localhost',port=6379,decode_responses=True)defchat_listener(username,room):"""监听聊天室消息的线程"""pubsub=r.pubsub()pubsub.subscribe(room)print(f"[{username}] 已加入聊天室:{room}")print(f"[{username}] 等待消息中... (Ctrl+C 退出)")try:formessageinpubsub.listen():ifmessage['type']=='message':sender,content=message['data'].split(':',1)ifsender!=username:print(f"\r[{room}]{sender}:{content}")print(f"[{username}] > ",end='',flush=True)exceptKeyboardInterrupt:print(f"\n[{username}] 正在离开聊天室...")finally:pubsub.unsubscribe(room)pubsub.close()defsend_message(username,room):"""发送消息的循环"""whileTrue:try:msg=input(f"[{username}] > ")ifmsg.strip().lower()=='/quit':breakfull_msg=f"{username}:{msg}"r.publish(room,full_msg)except(EOFError,KeyboardInterrupt):breakif__name__=='__main__':importsys username=sys.argv[1]iflen(sys.argv)>1else'匿名用户'room=sys.argv[2]iflen(sys.argv)>2else'general'# 启动监听线程listener=threading.Thread(target=chat_listener,args=(username,room),daemon=True)listener.start()# 主线程负责发送消息send_message(username,room)print(f"[{username}] 已退出聊天室")

运行效果:

# 终端1:$ python chat.py 张三 general[张三]已加入聊天室: general[张三]等待消息中...[张三]>大家好![general]李四: 张三好!# 终端2:$ python chat.py 李四 general[李四]已加入聊天室: general[李四]等待消息中...[general]张三: 大家好![李四]>张三好!

私聊功能

私聊就是用用户ID作为频道名

defsend_private_message(sender,receiver,content):"""发送私聊消息"""channel=f"private:{receiver}"message=f"{sender}:{content}"# 返回0说明用户不在线receivers=r.publish(channel,message)ifreceivers==0:print(f"[系统] 用户{receiver}不在线,消息已丢失")deflisten_private_messages(username):"""监听私聊消息"""channel=f"private:{username}"pubsub=r.pubsub()pubsub.subscribe(channel)formessageinpubsub.listen():ifmessage['type']=='message':print(f"\r[私聊]{message['data']}")

实战二:实时消息推送

在实际业务中,最常见的场景是推送通知——比如订单状态变更、系统告警等。

实时推送架构 ┌──────────┐ 订单创建 ┌──────────┐ PUBLISH ┌──────────┐ │ 订单服务 │ ──────────→ │ Redis │ ──────────→ │ 用户连接 │ │ │ order:123 │ │ │ WebSocket │ └──────────┘ │ order:123 │ │ SSE │ └──────────┘ │ Long Poll │ └──────────┘ 频道命名规范: user:{userId}:notifications → 用户通知 order:{orderId}:events → 订单事件 system:alerts → 系统告警 realtime:stock:{code} → 实时股价

Java + Spring Data Redis 实现

@ConfigurationpublicclassRedisPubSubConfig{@BeanRedisMessageListenerContainercontainer(RedisConnectionFactoryconnectionFactory,MessageListenerAdapterlistenerAdapter){RedisMessageListenerContainercontainer=newRedisMessageListenerContainer();container.setConnectionFactory(connectionFactory);// 订阅用户通知频道(使用模式订阅)container.addMessageListener(listenerAdapter,newPatternTopic("user.*:notifications"));returncontainer;}@BeanMessageListenerAdapterlistenerAdapter(NotificationListenerlistener){returnnewMessageListenerAdapter(listener);}}@ComponentpublicclassNotificationListenerimplementsMessageListener{// 注入 WebSocket 推送服务@AutowiredprivateWebSocketPushServicepushService;@OverridepublicvoidonMessage(Messagemessage,byte[]pattern){Stringchannel=newString(message.getChannel(),StandardCharsets.UTF_8);Stringbody=newString(message.getBody(),StandardCharsets.UTF_8);// 从频道名提取用户ID: "user:10086:notifications" → "10086"StringuserId=channel.split(":")[1];// 通过WebSocket推送给用户pushService.sendToUser(userId,body);System.out.printf("通知推送: 用户=%s, 消息=%s%n",userId,body);}}@ServicepublicclassOrderService{@AutowiredprivateStringRedisTemplateredisTemplate;publicvoidcreateOrder(StringorderId,StringuserId){// ... 创建订单逻辑 ...// 发布订单事件通知Stringchannel="user:"+userId+":notifications";Stringnotification=String.format("{\"type\":\"order_created\",\"orderId\":\"%s\"}",orderId);redisTemplate.convertAndSend(channel,notification);}}

实战三:与Keyspace通知结合的事件系统

Redis的Keyspace通知可以把键的变化事件通过Pub/Sub广播出来:

importredisimportjsonimportthreading r=redis.Redis(host='localhost',port=6379,decode_responses=True)# 开启键事件通知r.config_set('notify-keyspace-events','KEA')defevent_listener():"""监听所有键事件"""pubsub=r.pubsub()# 订阅db0的所有事件pubsub.psubscribe('__keyevent@0__:*')print("事件监听器已启动...")formessageinpubsub.listen():ifmessage['type']=='pmessage':event=message['channel'].split(':')[-1]key=message['data']print(f"[事件]{event}→ key:{key}")# 根据事件类型做不同处理ifevent=='expired':handle_key_expired(key)elifevent=='set':handle_key_set(key)elifevent=='del':handle_key_deleted(key)defhandle_key_expired(key):"""处理key过期事件"""print(f" → Key过期:{key}")# 可以触发缓存刷新、超时处理等逻辑defhandle_key_set(key):"""处理key设置事件"""print(f" → Key被设置:{key}")defhandle_key_deleted(key):"""处理key删除事件"""print(f" → Key被删除:{key}")# 启动监听thread=threading.Thread(target=event_listener,daemon=True)thread.start()

踩坑提示:Keyspace通知的事件不保证精确送达。特别是过期事件,Redis使用惰性删除 + 定期删除两种策略,过期事件的触发时机可能延迟几秒甚至更久。不要用它做精确的超时控制,比如"30秒后自动取消订单"——用Timer或延迟队列更靠谱。


发布订阅 vs 专业消息队列

很多同学会把Redis Pub/Sub当成消息队列来用。它确实可以做消息传递,但跟专业消息队列比起来,差距还是明显的。

全面对比表格

对比维度Redis Pub/SubRedis StreamKafkaRabbitMQ
消息持久化不持久化持久化持久化(磁盘)持久化
离线消费不支持支持支持支持
消息回溯不支持支持支持(offset)有限
ACK确认有(XACK)有(手动/自动)有(手动/自动)
消息堆积不支持支持支持(大量)支持
消息顺序有序有序分区有序队列有序
消费组不支持支持支持支持
延迟极低(<1ms)低(几ms)
吞吐量极高
消息不丢保证不保证保证保证保证
运维复杂度
功能丰富度

选型建议

消息系统选型决策 需要消息持久化/ACK/堆积? ├─ 否 ──→ 消息量大小? │ ├─ 小(<1万/秒)──→ Redis Pub/Sub ✓ │ └─ 大(>1万/秒)──→ Redis Pub/Sub(注意缓冲区) │ └─ 是 ──→ 团队有运维Kafka能力? ├─ 是 ──→ 数据量/吞吐量很大? │ ├─ 是 ──→ Kafka ✓ │ └─ 否 ──→ RabbitMQ / Redis Stream └─ 否 ──→ Redis Stream ✓(最简单的持久化消息方案)

Redis Stream:Pub/Sub的持久化升级版

如果你觉得Pub/Sub的消息丢失问题太严重,但又不想引入Kafka的复杂度,可以看看Redis 5.0引入的Redis Stream

# 创建消费者组(类似Kafka的消费组)XGROUP CREATE mystream mygroup $ MKSTREAM# 生产者发送消息XADD mystream * user:张三 action:login timestamp:1700000000# 返回: "1700000000000-0"# 消费者读取消息XREADGROUP GROUP mygroup consumer1 COUNT1STREAMS mystream># 确认消息处理完成(ACK)XACK mystream mygroup1700000000000-0

Redis Stream vs Pub/Sub 核心区别:

Pub/Sub: Stream: PUBLISH ──→ [Redis] ──→ 在线订阅者 ↓ 消息消失 离线订阅者 → 什么也收不到 VS XADD ──→ [Stream] ──→ 在线消费者 ↓ 消息持久化 离线消费者 → 上线后可以消费历史消息

Java 中使用 Lettuce 实现发布订阅

Lettuce 原生Pub/Sub

publicclassLettucePubSubExample{publicstaticvoidmain(String[]args){RedisClientclient=RedisClient.create("redis://localhost:6379");StatefulRedisConnection<String,String>connection=client.connect();// 开启Pub/Sub需要单独的连接StatefulRedisPubSubConnection<String,String>pubsubConnection=client.connectPubSub();// 添加消息监听器pubsubConnection.addListener(newRedisPubSubAdapter<String,String>(){@Overridepublicvoidmessage(Stringchannel,Stringmessage){System.out.printf("收到消息 [%s]: %s%n",channel,message);}@Overridepublicvoidsubscribed(Stringchannel,longcount){System.out.printf("已订阅频道: %s (当前订阅数: %d)%n",channel,count);}});// 异步订阅RedisPubSubAsyncCommands<String,String>async=pubsubConnection.async();async.subscribe("news","sports","weather");// 发布消息(使用普通连接)RedisAsyncCommands<String,String>commands=connection.async();commands.publish("news","今天是个好日子").thenAccept(receivers->{System.out.printf("消息已发布,%d个订阅者收到%n",receivers);});// 保持运行try{Thread.sleep(10000);}catch(InterruptedExceptione){}}}

多实例部署下的 Pub/Sub 注意事项

Cluster 模式下的 Pub/Sub 限制

在Cluster模式下,Pub/Sub有一个重要的限制:PUBLISH命令只会在消息所在分片的节点上广播

Cluster 模式下 Pub/Sub 的问题 集群有3个分片: 分片1 (Master-A): 槽 0-5460 分片2 (Master-B): 槽 5461-10922 分片3 (Master-C): 槽 10923-16383 Subscriber-1 连接到 Master-A,订阅 "news" Subscriber-2 连接到 Master-B,订阅 "news" Subscriber-3 连接到 Master-C,订阅 "news" Publisher 连接到 Master-A,执行 PUBLISH news "hello" 结果: → Subscriber-1 收到消息 ✓ (在Master-A上) → Subscriber-2 没收到 ✗ (在Master-B上,不在同一个分片) → Subscriber-3 没收到 ✗ (在Master-C上,不在同一个分片)

踩坑提示:这是Cluster模式下使用Pub/Sub最大的坑!很多同学在开发测试环境(Standalone)没问题,上了集群就发现消息丢失,就是因为这个原因。

解决方案:Sharded Pub/Sub(Redis 7.0+)

Redis 7.0引入了Sharded Pub/Sub,让消息在同一个分片的节点之间正确广播:

# Redis 7.0+ 新增命令SSUBSCRIBE channel[channel...]# 分片订阅SUNSUBSCRIBE[channel...]# 分片退订SPUBLISH channel message# 分片发布

Sharded Pub/Sub的特点:

  • 频道被映射到具体的分片(类似Key的槽位映射)
  • 只有在同一个分片上的订阅者才会收到消息
  • 适合需要分片内广播的场景

对于全集群广播的需求,Redis 7.0之前没有好方案,Redis 7.0的普通Pub/Sub仍然只在本地分片有效。如果确实需要全集群广播,可以考虑:

全集群广播方案 方案1: 在每个分片的Master上都执行一次PUBLISH ┌──────────────┐ │ App Server │ │ │── PUBLISH → Master-A (分片1) │ │── PUBLISH → Master-B (分片2) │ │── PUBLISH → Master-C (分片3) └──────────────┘ 方案2: 使用外部消息中间件(如Kafka)做全局广播 ┌──────────────┐ ┌─────────┐ ┌──────────────┐ │ App Server │ ──→ │ Kafka │ ──→ │ 各分片Redis │ └──────────────┘ └─────────┘ └──────────────┘

Pub/Sub 的性能考量

客户端输出缓冲区

当订阅者处理消息的速度跟不上发布速度时,消息会在客户端的输出缓冲区中堆积。如果缓冲区超过限制,Redis会强制断开这个客户端。

# 查看客户端配置CONFIG GET client-output-buffer-limit# 默认值:# 1) "client-output-buffer-limit"# 2) "normal 0 0 0 slave 268435456 67108864 60 pubsub 33554432 8388608 60"## pubsub类型:# hard limit: 32MB (33554432)# soft limit: 8MB (8388608), 持续60秒# 调整Pub/Sub客户端的缓冲区限制CONFIG SET client-output-buffer-limit"pubsub 64mb 16mb 120"

高频发布的影响

# 测试: 10万QPS的PUBLISH# 在一个终端:$ redis-benchmark-tpublish-P100-c50-n100000# 结果: ~110000 requests/sec# 但如果订阅者处理慢:# → 输出缓冲区持续增长# → 最终触发断连# → 订阅者被踢掉,消息全部丢失

性能最佳实践

建议说明
保持订阅者处理速度快于发布速度最重要的一点
合理设置client-output-buffer-limit根据消息大小和预期QPS调整
使用连接池管理Pub/Sub连接不要和常规命令混用同一个连接
监控Pub/Sub相关指标pubsub_channels数量、模式订阅数量
大量频道时避免模式订阅模式订阅是O(N)遍历,频道多时性能差

本章小结

Redis Pub/Sub是一个"够用就好"的轻量级消息方案:

场景推荐方案原因
实时通知推送Pub/Sub延迟低,实现简单
聊天室Pub/Sub天然适合
实时配置更新Pub/Sub+ Keyspace通知零额外组件
任务队列Redis Stream / 消息队列需要持久化和ACK
事件溯源Kafka需要消息回溯和长期存储
大流量消息总线Kafka需要高吞吐和持久化

记住一个原则:Pub/Sub是通知机制,不是消息队列。用它做"实时通知",而不是"可靠投递"。如果业务对消息丢失零容忍,请果断选择Redis Stream或Kafka。


上一篇【第53篇】Redis发布订阅——消息队列的轻量替代方案
下一篇【第55篇】Redis事务——MULTI/EXEC/DISCARD/WATCH详解


http://www.zskr.cn/news/1461068.html

相关文章:

  • 告别复杂配置:用快马AI一键生成你的第一个LaTeX学术论文模板
  • 归并排序(递归代码)
  • 石家庄黄金回收找哪家?这五家正规门店免费上门,久美30年零差评 - 行行星
  • 【Redis从入门到精通】第55篇:Redis事务——MULTI/EXEC/DISCARD/WATCH详解
  • 基于树莓派与OpenCV的实时人脸识别系统:从硬件搭建到算法部署全流程
  • 96110是什么电话?新流派带你了解反诈专线背后的秘密
  • 2026国产数据库全景图:按架构、按行业、按能力三维度一表选型
  • VOCs检测车监控管理平台解决方案
  • 告别pip install失败:手把手教你搞定Python Click的离线安装(附国内镜像源大全)
  • 生成式智能搜索下的流量卡位攻略:初创个体如何甄选高兼容性的 GEO 优化 服务商
  • 高并发服务器必备:小根堆定时器从设计到实现全流程
  • 解密NomNom存档编辑器:三步搞定JSON导出异常问题
  • Python量化交易实战:如何用jqktrader构建高效自动化交易系统
  • 2026年汉中市口碑首选!黄金回收铂金回收白银回收权威门店 TOP5 附咨询电话 - 信誉隆金银铂奢回收
  • TCC-G15终极指南:快速掌控Dell笔记本散热性能的完整方案
  • 实战指南:Python自动化获取B站数据全流程
  • 2026年宝鸡市黄金回收白银回收铂金回收门店 TOP5榜单无套路:实体店铺地址电话一览 - 诚金汇钻回收公司
  • 如何用Python构建同花顺自动化交易系统:jqktrader技术深度解析
  • 别再死记硬背网表了!用HSPICE和Spectre仿真MOSFET时,这3个参数设置错了等于白跑
  • 2026年怀化市口碑首选!黄金回收铂金回收白银回收权威门店 TOP5 附咨询电话 - 信誉隆金银铂奢回收
  • 2026年显微硬度计哪家强?精密硬度检测设备推荐上海钜惠仪器 - 品牌推荐大师1
  • 吐司:自然语言生成App的首选零代码平台
  • 如何高效使用HLS下载器:浏览器流媒体下载的终极解决方案
  • Windows Btrfs驱动完全指南:如何实现跨平台文件系统的终极支持
  • 广元市2026年黄金回收白银回收铂金回收权威门店 TOP5+正规可靠机构电话与地址汇总 - 中安检金银铂钻回收
  • AI赋能开发:探索如何用快马AI模型智能生成并增强qclow官网
  • 2026年山东面粉加工设备与豆类磨粉机源头厂家深度选购指南 - 企业名录优选推荐
  • 10分钟精通语雀文档完整导出:零代码迁移实战指南
  • 青岛十几年老店收翡翠,不靠套路赚差价,靠实价留回头客 - 奢侈品交易观察员
  • 2026雨水收集模块厂家推荐:MEA米亚百年技术赋能水资源管理 - 品牌排行榜