消息队列(MQ)(待完善)
消息队列
1.什么是消息队列?
消息列队(Message Queue,简称 MQ)是一个异步通讯机制,生产者和消费者只需通过消息队列就能够进行通讯。
2.消息队列有什么作用?
异步
生产者将消息发送到消息队列后,不需要等待消费者处理完成,就可以去处理其他任务
削峰
将大量请求暂时存在消息列队中,由服务器根据自己的能里去慢慢消费,避免大量请求压垮服务器。
- MySQL 库存服务的处理能力只有5000 QPS(每秒最多扣 5000 次库存),当10万请求MySQL数据库时,可以先将请求暂存到MQ,后续根据自己数据库真实的抗压能力再去慢慢消费。
解耦
上下游系统不再直接硬调用,而是通过消息队列中转,一方发消息、一方消费,双方不需要进行通讯
场景:订单系统下单后,需通知库存、物流、积分、短信等多个服务。
- 不用 MQ:订单服务依次调用所有下游,下游宕机 / 改接口,订单服务跟着受影响,耦合极强。
- 用 MQ:订单只发一条消息到队列,各服务自行监听消费。新增 / 下线服务,无需改动订单代码。
3.消息队列有什么缺点?
系统复杂度变高:由原来的A 系统直接调 B 系统,到现在的A系统到MQ,再到B系统。当MQ宕机、网络故障时,整个业务链路直接瘫痪。
数据一致性问题:MQ 是异步的,A系统发送消息成功,但B系统可能会处理失败
系统的开发难度上升:原本很简单的一个方法调用,变成了需要额外编写消息发送、接收、重试、幂等逻辑,还需要处理消息丢失、重复、乱序等问题。
4.消息队列如何选型?
没有最好的 MQ,只有最匹配业务的 MQ
| 对比维度 | Kafka | RocketMQ | RabbitMQ | ActiveMQ |
|---|---|---|---|---|
| 开发 / 维护 | 开源,Apache,社区活跃 | 开源,Apache,阿里维护,国内生态强 | 开源,社区成熟,国外主流 | 开源,Apache,逐步衰退,更新慢 |
| 底层语言 | Scala/Java | Java | Erlang | Java |
| 单机吞吐量 | 百万级 / 秒(最高) | 十万级 / 秒 | 万级 / 秒 | 千级 / 秒(最低) |
| 消息延迟 | 毫秒级 | 毫秒级 | 微秒~低毫秒(最低延迟) | 中等毫秒 |
| 可靠性 | 高(副本、持久化) | 极高(事务、重试、死信、HA 完善) | 高(持久化、镜像队列) | 一般,高并发易丢消息 |
| 消息堆积 | 擅长海量堆积 | 支持大规模堆积 | 堆积能力弱,易卡顿 | 堆积极易出问题 |
| 适用场景 | 日志采集、大数据流、监控、海量流式消息 | 电商、金融、业务解耦、事务消息、高可靠业务 | 实时业务、路由复杂、跨协议、小消息低延迟 | 老旧系统兼容、低并发传统项目 |
比如要支持淘宝双十一类超大型的秒杀活动,看重吞吐量。优先选Kafka和RocketMQ这种更高吞吐的
比如一个金融类业务,那么重点考虑的就是稳定性、安全性,分布式部署的Kafka和Rocket就更有优势。
5.消息队列的模型有哪些?
队列模型
在队列模型中,消息从生产者发送到队列,并且每条消息只能被一个消费者消费一次。消费之后,消息在队列中被删除。
主题模型 (发布/订阅模型)
在主题模型中,消息的生产者称为发布者(Publisher),消息的消费者称为订阅者(Subscriber),存放消息的容器称为主题(Topic)。发布者将消息发送到指定主题中,订阅者需要提前订阅主题才能接受特定主题的消息
RocketMQ
1.RocketMQ的消息模型
RabbitMQ中的消息模型就是按照“主题模型”所实现的。主要分为:
- Producer(生产者):发消息到 TopicRocketMQ。
- Broker(服务端):存消息、转发消息
- Consumer(消费者):从 Topic 收消息RocketMQ。
- Topic(主题):消息的逻辑分类
2.RocketMQ的架构
RocketMQ 技术架构分为Broker、NameServer、Producer、Consumer
Broker:消息的实际载体,用于存储消息。
NameServer:类似于注册中心。
- 动态存储Broker将元数据。如:Broker 的名称、Broker 的网络地址、Topic 路由信息、存活状态标识
- 为生产者 / 消费者提供 Topic 路由信息。也就是告诉生产者和消费者某个Topic存在哪个Broker中。
Producer:生产者,负责消息发送到Broker
Consumer :消费者,负责订阅消息和消费消息
3.RocketMQ整体工作流程
- Broker启动时会向所有NameServer建立常连接,每30秒发送一次心跳
- Producer发送消息时,从NameServer获取Broker的地址,然后向Broker发送消息
- Consumer消费消息时,从NameServer获取Broker的地址,然后从Broker拉取消息
4.Broker和NameServer的心跳机制
1.为什么需要心跳机制?
NameServer无法感知Broker的存活状态,生产者和消费者会连接到无效的Broker,会导致消息发送失败等问题。
2.什么是心跳机制?
Broker每30秒会给NameServer发送一次心跳,告诉NameServer自己还活着。
NameServer默认每隔10 秒执行一次任务,去扫描每个Broker最后一次发送的心跳时间,如果一个Broker心跳时间与当前时间距离120秒,就会认为Broker 已经宕机。
5.RocketMQ消息发送模式
同步发送:生产者发送消息后,会阻塞当前线程执行,直到Broker返回明确的成功或失败消息,线程才能够继续执行。
异步发送:生产者发送消息后,立即返回,不需要等待Broker响应,后续Broker处理消息完成后,通过回调函数通知生产者。
单向发送:生产者发送消息后立即返回,不关心Broker是否接受到消息
6.RocketMQ的刷盘机制
异步刷盘:消息存入Broker内存后,Broker立即返回ACK给到生产者,之后再进行刷盘
同步刷盘:消息存入到Broker内存并进行刷盘后,Broker立即返回ACK给到生产者
7.RocketMQ如何保证消息不丢失?
消息丢失只出现在三个阶段:消息生产阶段、消息存储阶段、消息消费阶段,所以要保证消息就是保证三个环节都不能丢失数据。
消息生产阶段
丢失原因
因为生产者在将消息发送到Broker时,可能因为网络波动问题导致消息未成功发送到Broker 。
解决方法
可以使用同步发送模式,Broker收到消息时才会返回一个ACK给到生产者,如果Broker没有收到消息,生产者会重试发送消息,这样就保证了消息不丢失。
消息存储阶段
丢失原因
消息刚放内存,还没写硬盘,Broker 宕机, 消息没了
解决方法
Broker采用同步刷盘策略,消息只有在持久化到磁盘后,Broker才会返回ACK给到生产者。
消息消费阶段
丢失原因
消费者消息刚拿到,还没处理完,就告诉 Broker “我消费完了”。结果消费者宕机,消息丢失
解决办法
关闭自动提交,改为手动提交,当消费者收到消息并处理完业务逻辑后,手动提交ACK到Broker。
8.RocketMQ如何处理消息重复问题?
1.为什么会重复?
1、生产者发送消息后,由于网络波动,生产者未能收到确认响应,导致消息被重复发送。
2、消费者在处理消息后,由于系统崩溃或网络问题,未及时向MQ中间件发送 ACK,MQ重新发送消息
2.解决办法
为了避免重复消息带来的副作用,关键在于使消费者的处理逻辑具有幂等性。幂等性是指一个操作无论执行多少次,产生的结果都是一样的。
如何保证幂等性?
- 消息唯一标识(ID):为每条消息引入全局唯一ID(如UUID、订单号等),利用Redis或数据库来存储已处理过的消息ID。消费者在处理消息时,先检查该消息ID是否已存在,如果存在则说明消息已被处理过。如果不存在,把 ID 存入 Redis / 数据库。
- 数据库唯一索引:给业务字段建立唯一索引,重复消息插入就会失败
9.RocketMQ如何处理消息堆积问题?
消息积压是因为生产者的生产速度,大于消费者的消费速度。
解决办法
- 先定位消费慢的原因,如果是
bug则处理bug,如消费线程发生死锁、线程池中消费者线程数配置不合理 - 如果是因为消费者本身消费能力较弱,则优化消费者代码逻辑,比如减少单个消息处理时间、使用批量处理消息
- 如果代码逻辑也优化了还是慢,则可以增加消费者的线程数量,提高消费能力,或者进行扩容,增加多个消费者,提高消费能力。(增加多个消费者:不同机器 /不同虚拟服务器/同一服务器的不同进程 / 端口)
- 对生产者进行限流,控制生产者发送消息的速度
- 消费者降级,对低优先级的消息进行丢弃或延迟处理。
10.RocketMQ如何保证消息的有序性?
1.为什么要保证消息的有序性?
消息的有序性是指消息的消费顺序能够与消息的发送顺序一致。例如:一个订单产生了3条消息,分别是订单创建、订单付款和订单完成。在消息消费时,同一条订单要严格按照这个顺序进行消费,否则业务会发生混乱。
2.如何保障消息有序性的?
局部顺序:RocketMQ 按照业务划分成不同的队列,不同业务的队列并发消费,同一业务中的队列按照发送消息的顺序消费
- 场景:订单、物流、交易等不同业务并行执行,同一个业务内部必须严格有序。比如同一个订单的 创建→付款→发货→完成 必须按顺序执行。
全局顺序:RocketMQ将所有业务消息放入到同一个队列中,消费时严格遵循发送先后顺序逐条执行
- 场景:订单、物流、交易等所有业务消息按发送顺序,全部存入同一个队列。
11.RocketMQ如何实现延迟消息的?
1.什么是延迟消息?
延迟消息是指将消息发送到MQ后,不会立即发送给消费者,而是设定一个定时时间,当时间一到就才会发送。(如在电商平台下单后 30 分钟未支付,“订单已取消” 的信息就会发送给消费者)
2.如何实现延迟消息?
一、先记住:RocketMQ 不支持 “任意时间” 延迟,只能选默认的18 个固定级别
1s、5s、10s、30s、 1m、2m、3m、4m、5m、6m、7m、8m、9m、10m、20m、30m、 1h、2h二、实现整体流程
Broker 接收延迟消息后,存入到延迟队列中,定时调度服务轮询检查延迟队列,当消息到期时,Broker 将消息放入到正常队列中,消费者开始正常消费。
12.RocektMQ怎么处理分布式事务?
1.什么是分布式事务?
指一个业务操作需要跨 2 个及以上独立服务 / 数据库 / 服务器才能够完成,而且这个业务操作要么全部成功,要么全部失败。
- 场景:A 服务给 B 服务转 100块钱,A服务减100,B服务加100。两个服务要么一起成功,还要么失败
2.如何来保证事务的一致性?
- 生产者向 Broker发送半消息 (半消息:消息成功发送给Broker 端,但消息被标记为不可投递状态,消费者无法消费)
- 生产者收到Broker成功响应的话,生产者开始执行本地事务
- 生产者根据本地事务执行结果,向Broker发送提交或回滚
- 如果发送的是提交,Broker将消息标记为正常消息,消费者可以正常消费
- 如果发送的是回滚,Broker将消息删除
- 如果Broker没收到二次确认,Broker 会主动回查生产者的事务执行结果
- 再根据结果进行提交处理或者回滚处理
13.死信队列?
死信队列是专门存储哪些无法被正常消费的消息
当一条消息初次被消费者消费失败时,RocketMQ会自动重试消费消息;当达到最大重试次数后,依然消费失败,则将消息放入到死信队列中,等待人工干预处理。
