当前位置: 首页 > news >正文

消息队列(MQ)(待完善)

消息队列

1.什么是消息队列?

消息列队(Message Queue,简称 MQ)是一个异步通讯机制,生产者和消费者只需通过消息队列就能够进行通讯。


2.消息队列有什么作用?

异步

生产者将消息发送到消息队列后,不需要等待消费者处理完成,就可以去处理其他任务

削峰

将大量请求暂时存在消息列队中,由服务器根据自己的能里去慢慢消费,避免大量请求压垮服务器。

  • MySQL 库存服务的处理能力只有5000 QPS(每秒最多扣 5000 次库存),当10万请求MySQL数据库时,可以先将请求暂存到MQ,后续根据自己数据库真实的抗压能力再去慢慢消费。

解耦

上下游系统不再直接硬调用,而是通过消息队列中转,一方发消息、一方消费,双方不需要进行通讯

场景:订单系统下单后,需通知库存、物流、积分、短信等多个服务。

  • 不用 MQ:订单服务依次调用所有下游,下游宕机 / 改接口,订单服务跟着受影响,耦合极强。
  • 用 MQ:订单只发一条消息到队列,各服务自行监听消费。新增 / 下线服务,无需改动订单代码

3.消息队列有什么缺点?

系统复杂度变高:由原来的A 系统直接调 B 系统,到现在的A系统到MQ,再到B系统。当MQ宕机、网络故障时,整个业务链路直接瘫痪。

数据一致性问题:MQ 是异步的,A系统发送消息成功,但B系统可能会处理失败

系统的开发难度上升:原本很简单的一个方法调用,变成了需要额外编写消息发送、接收、重试、幂等逻辑,还需要处理消息丢失、重复、乱序等问题。


4.消息队列如何选型?

没有最好的 MQ,只有最匹配业务的 MQ

对比维度KafkaRocketMQRabbitMQActiveMQ
开发 / 维护开源,Apache,社区活跃开源,Apache,阿里维护,国内生态强开源,社区成熟,国外主流开源,Apache,逐步衰退,更新慢
底层语言Scala/JavaJavaErlangJava
单机吞吐量百万级 / 秒(最高)十万级 / 秒万级 / 秒千级 / 秒(最低)
消息延迟毫秒级毫秒级微秒~低毫秒(最低延迟)中等毫秒
可靠性高(副本、持久化)极高(事务、重试、死信、HA 完善)高(持久化、镜像队列)一般,高并发易丢消息
消息堆积擅长海量堆积支持大规模堆积堆积能力弱,易卡顿堆积极易出问题
适用场景日志采集、大数据流、监控、海量流式消息电商、金融、业务解耦、事务消息、高可靠业务实时业务、路由复杂、跨协议、小消息低延迟老旧系统兼容、低并发传统项目

比如要支持淘宝双十一类超大型的秒杀活动,看重吞吐量。优先选Kafka和RocketMQ这种更高吞吐的

比如一个金融类业务,那么重点考虑的就是稳定性、安全性,分布式部署的Kafka和Rocket就更有优势。


5.消息队列的模型有哪些?

队列模型

在队列模型中,消息从生产者发送到队列,并且每条消息只能被一个消费者消费一次。消费之后,消息在队列中被删除。

主题模型 (发布/订阅模型)

在主题模型中,消息的生产者称为发布者(Publisher),消息的消费者称为订阅者(Subscriber),存放消息的容器称为主题(Topic)。发布者将消息发送到指定主题中,订阅者需要提前订阅主题才能接受特定主题的消息

RocketMQ

1.RocketMQ的消息模型

RabbitMQ中的消息模型就是按照“主题模型”所实现的。主要分为:

  • Producer(生产者):发消息到 TopicRocketMQ。
  • Broker(服务端):存消息、转发消息
  • Consumer(消费者):从 Topic 收消息RocketMQ。
  • Topic(主题):消息的逻辑分类

2.RocketMQ的架构

RocketMQ 技术架构分为Broker、NameServer、Producer、Consumer

Broker消息的实际载体,用于存储消息。

NameServer:类似于注册中心。

  • 动态存储Broker将元数据。如:Broker 的名称、Broker 的网络地址、Topic 路由信息、存活状态标识
  • 为生产者 / 消费者提供 Topic 路由信息。也就是告诉生产者和消费者某个Topic存在哪个Broker中。

Producer:生产者,负责消息发送到Broker

Consumer :消费者,负责订阅消息和消费消息


3.RocketMQ整体工作流程

  1. Broker启动时会向所有NameServer建立常连接,每30秒发送一次心跳
  2. Producer发送消息时,从NameServer获取Broker的地址,然后向Broker发送消息
  3. Consumer消费消息时,从NameServer获取Broker的地址,然后从Broker拉取消息

4.Broker和NameServer的心跳机制

1.为什么需要心跳机制?

NameServer无法感知Broker的存活状态,生产者和消费者会连接到无效的Broker,会导致消息发送失败等问题。

2.什么是心跳机制?

Broker每30秒会给NameServer发送一次心跳,告诉NameServer自己还活着。

NameServer默认每隔10 秒执行一次任务,去扫描每个Broker最后一次发送的心跳时间,如果一个Broker心跳时间与当前时间距离120秒,就会认为Broker 已经宕机。


5.RocketMQ消息发送模式

同步发送:生产者发送消息后,会阻塞当前线程执行,直到Broker返回明确的成功或失败消息,线程才能够继续执行。

异步发送:生产者发送消息后,立即返回,不需要等待Broker响应,后续Broker处理消息完成后,通过回调函数通知生产者。

单向发送:生产者发送消息后立即返回,不关心Broker是否接受到消息


6.RocketMQ的刷盘机制

异步刷盘:消息存入Broker内存后,Broker立即返回ACK给到生产者,之后再进行刷盘

同步刷盘:消息存入到Broker内存并进行刷盘后,Broker立即返回ACK给到生产者


7.RocketMQ如何保证消息不丢失?

消息丢失只出现在三个阶段:消息生产阶段、消息存储阶段、消息消费阶段,所以要保证消息就是保证三个环节都不能丢失数据。

消息生产阶段

丢失原因

因为生产者在将消息发送到Broker时,可能因为网络波动问题导致消息未成功发送到Broker 。

解决方法

可以使用同步发送模式,Broker收到消息时才会返回一个ACK给到生产者,如果Broker没有收到消息,生产者会重试发送消息,这样就保证了消息不丢失。

消息存储阶段

丢失原因

消息刚放内存,还没写硬盘,Broker 宕机, 消息没了

解决方法

Broker采用同步刷盘策略,消息只有在持久化到磁盘后,Broker才会返回ACK给到生产者。

消息消费阶段

丢失原因

消费者消息刚拿到,还没处理完,就告诉 Broker “我消费完了”。结果消费者宕机,消息丢失

解决办法

关闭自动提交,改为手动提交,当消费者收到消息并处理完业务逻辑后,手动提交ACK到Broker。


8.RocketMQ如何处理消息重复问题?

1.为什么会重复?

1、生产者发送消息后,由于网络波动,生产者未能收到确认响应,导致消息被重复发送。

2、消费者在处理消息后,由于系统崩溃或网络问题,未及时向MQ中间件发送 ACK,MQ重新发送消息

2.解决办法

为了避免重复消息带来的副作用,关键在于使消费者的处理逻辑具有幂等性幂等性是指一个操作无论执行多少次,产生的结果都是一样的。

如何保证幂等性?

  • 消息唯一标识(ID):为每条消息引入全局唯一ID(如UUID、订单号等),利用Redis或数据库来存储已处理过的消息ID。消费者在处理消息时,先检查该消息ID是否已存在,如果存在则说明消息已被处理过。如果不存在,把 ID 存入 Redis / 数据库。
  • 数据库唯一索引:给业务字段建立唯一索引,重复消息插入就会失败

9.RocketMQ如何处理消息堆积问题?

消息积压是因为生产者的生产速度,大于消费者的消费速度。

解决办法

  1. 先定位消费慢的原因,如果是bug则处理bug,如消费线程发生死锁、线程池中消费者线程数配置不合理
  2. 如果是因为消费者本身消费能力较弱,则优化消费者代码逻辑,比如减少单个消息处理时间、使用批量处理消息
  3. 如果代码逻辑也优化了还是慢,则可以增加消费者的线程数量,提高消费能力,或者进行扩容,增加多个消费者,提高消费能力。(增加多个消费者:不同机器 /不同虚拟服务器/同一服务器的不同进程 / 端口)
  4. 对生产者进行限流,控制生产者发送消息的速度
  5. 消费者降级,对低优先级的消息进行丢弃或延迟处理。

10.RocketMQ如何保证消息的有序性?

1.为什么要保证消息的有序性?

消息的有序性是指消息的消费顺序能够与消息的发送顺序一致。例如:一个订单产生了3条消息,分别是订单创建、订单付款和订单完成。在消息消费时,同一条订单要严格按照这个顺序进行消费,否则业务会发生混乱。

2.如何保障消息有序性的?

局部顺序:RocketMQ 按照业务划分成不同的队列,不同业务的队列并发消费,同一业务中的队列按照发送消息的顺序消费

  • 场景:订单、物流、交易等不同业务并行执行,同一个业务内部必须严格有序。比如同一个订单的 创建→付款→发货→完成 必须按顺序执行。

全局顺序:RocketMQ将所有业务消息放入到同一个队列中,消费时严格遵循发送先后顺序逐条执行

  • 场景:订单、物流、交易等所有业务消息按发送顺序,全部存入同一个队列。


11.RocketMQ如何实现延迟消息的?

1.什么是延迟消息?

延迟消息是指将消息发送到MQ后,不会立即发送给消费者,而是设定一个定时时间,当时间一到就才会发送。(如在电商平台下单后 30 分钟未支付,“订单已取消” 的信息就会发送给消费者)


2.如何实现延迟消息?

一、先记住:RocketMQ 不支持 “任意时间” 延迟,只能选默认的18 个固定级别

1s、5s、10s、30s、 1m、2m、3m、4m、5m、6m、7m、8m、9m、10m、20m、30m、 1h、2h

二、实现整体流程

Broker 接收延迟消息后,存入到延迟队列中,定时调度服务轮询检查延迟队列,当消息到期时,Broker 将消息放入到正常队列中,消费者开始正常消费。


12.RocektMQ怎么处理分布式事务?

1.什么是分布式事务?

指一个业务操作需要跨 2 个及以上独立服务 / 数据库 / 服务器才能够完成,而且这个业务操作要么全部成功,要么全部失败。

  • 场景:A 服务给 B 服务转 100块钱,A服务减100,B服务加100。两个服务要么一起成功,还要么失败

2.如何来保证事务的一致性?

  1. 生产者向 Broker发送半消息 (半消息:消息成功发送给Broker 端,但消息被标记为不可投递状态,消费者无法消费)
  2. 生产者收到Broker成功响应的话,生产者开始执行本地事务
  3. 生产者根据本地事务执行结果,向Broker发送提交或回滚
  4. 如果发送的是提交,Broker将消息标记为正常消息,消费者可以正常消费
  5. 如果发送的是回滚,Broker将消息删除
  6. 如果Broker没收到二次确认,Broker 会主动回查生产者的事务执行结果
  7. 再根据结果进行提交处理或者回滚处理


13.死信队列?

死信队列是专门存储哪些无法被正常消费的消息

当一条消息初次被消费者消费失败时,RocketMQ会自动重试消费消息;当达到最大重试次数后,依然消费失败,则将消息放入到死信队列中,等待人工干预处理。

http://www.zskr.cn/news/1495790.html

相关文章:

  • ERP系统对中小型企业的六大优势
  • 从数据手册到实战:K40微控制器时钟与ADC电气特性深度解析
  • 索尼相机终极解锁指南:用OpenMemories-Tweak释放隐藏功能,3步告别30分钟录制限制
  • 2026杭州市钱塘区家里卫生间漏水、阳台漏水、楼顶漏水、阳台漏水、地下室渗水、阳光房漏水各种房屋漏水情况不用愁!本地防水补漏公司为您排忧解难!精准推荐附近专业防水团队 - 防水百科
  • HarmonyOS ArkUI 深度响应:@Observed 与 @ObjectLink 完全指南
  • MailCore IMAP实战教程:10个技巧高效管理邮件文件夹和消息
  • 2026杭州市富阳区家里卫生间漏水、阳台漏水、楼顶漏水、阳台漏水、地下室渗水、阳光房漏水各种房屋漏水情况不用愁!本地防水补漏公司为您排忧解难!精准推荐附近专业防水团队 - 防水百科
  • KeymouseGo终极指南:三步掌握免费开源鼠标键盘自动化工具
  • 2026北京市昌平区家里卫生间漏水、阳台漏水、楼顶漏水、阳台漏水、地下室渗水、阳光房漏水各种房屋漏水情况不用愁!本地防水补漏公司为您排忧解难!精准推荐附近专业防水团队 - 防水百科
  • 遗传算法进阶核心:选择压力、适应度缩放与精英策略实战解析
  • 如何用Point-E在5分钟内从文本生成3D点云?完整实战指南
  • GIS局部放电在线监测:让电网隐患“无处遁形”
  • 题解:学而思编程 斐波那契字符串
  • 华硕笔记本终极性能调校指南:G-Helper完整教程
  • Zerolang社区贡献指南:如何参与这个革命性编程语言的开发
  • 3步解决老旧Mac无法升级macOS问题:OpenCore Legacy Patcher终极指南
  • 别再手动调格式了!用NoteExpress搞定毕业论文参考文献(附样式修改避坑指南)
  • 如何快速掌握Wasmtime:WebAssembly运行时完整指南
  • 3PEAK思瑞浦 TPA5521-S5TR SOT23-5 运算放大器
  • 华硕笔记本性能调节终极指南:5分钟掌握G-Helper轻量级控制神器
  • 实测12款论文降AIGC网站,效果最好的竟然是它!
  • 面向对象的三大特性
  • CouchApp与CouchDB集成:如何创建高效的数据驱动Web应用的7个步骤
  • 终极暗黑2存档编辑器:免费网页工具让D2/D2R存档编辑变得简单快速
  • py之socket ssl双向认证代码(亲测好用)
  • LLMxMapReduce未来展望:多模态支持、实时处理与分布式计算的演进方向
  • 神经渲染:打开宇宙的“数字之眼”——天文可视化的新范式
  • FGO-py:让你的Fate/Grand Order游戏体验焕然一新的智能管家
  • Qbot量化交易框架深度解析:从本地部署到智能策略实战验证
  • Python 爬虫项目 基于 Redis 实现爬虫 IP 代理池搭建与动态代理轮换