当前位置: 首页 > news >正文

MQ生产者确认机制捕获到消息投递失败后如何重试?

要实现生产者确认机制失败后自动重试重新投递,核心思路是:将发送失败的消息暂存→按策略重试→跟踪重试状态→失败兜底。以下是具体实现思路和关键步骤,结合代码示例说明。

一、核心思路框架

当生产者通过 ConfirmCallback 收到 ack=false(Broker 未确认接收)或超时未收到确认时,说明消息发送失败。此时需将消息暂存到可靠存储(避免内存丢失),并按重试策略(次数、间隔)重新投递,直至成功或超过阈值后转入死信队列。

二、关键实现步骤

1. 定义“失败消息”存储结构

需持久化存储失败消息的核心信息,确保重启后不丢失。推荐用 数据库(MySQL/PostgreSQL)或 Redis(缓存+持久化),字段包括:

字段名 说明 示例值
msg_id 消息唯一ID(CorrelationData的ID) MSG-1690000000000-0.123456
exchange 目标交换机名称 order.exchange
routing_key 目标路由键 order.routingKey
message_body 消息体(JSON序列化) {"id":1001,"amount":99.9}
retry_count 当前重试次数 0(初始值)
max_retry 最大重试次数(如3次) 3
next_retry_time 下次重试时间(时间戳) 1690000060000(当前时间+10秒)
status 状态(待重试/重试中/失败) PENDING

2. 实现“失败消息”存储层

MySQL 为例,创建表存储失败消息:

CREATE TABLE mq_failed_message (id BIGINT PRIMARY KEY AUTO_INCREMENT,msg_id VARCHAR(64) NOT NULL UNIQUE COMMENT '消息唯一ID',exchange VARCHAR(128) NOT NULL COMMENT '交换机',routing_key VARCHAR(128) NOT NULL COMMENT '路由键',message_body TEXT NOT NULL COMMENT '消息体(JSON)',retry_count INT DEFAULT 0 COMMENT '当前重试次数',max_retry INT DEFAULT 3 COMMENT '最大重试次数',next_retry_time BIGINT NOT NULL COMMENT '下次重试时间戳(ms)',status VARCHAR(20) DEFAULT 'PENDING' COMMENT '状态:PENDING/RETRYING/FAILED',created_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP,updated_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,INDEX idx_next_retry_time (next_retry_time) COMMENT '按下次重试时间索引'
);

3. 发送消息时关联“失败存储”

生产者发送消息时,生成唯一 msg_id(如 UUID 或雪花算法),并在 ConfirmCallback 中处理失败逻辑:

@Service
public class RetryProducer {@Autowiredprivate RabbitTemplate rabbitTemplate;@Autowiredprivate FailedMessageStorage failedMessageStorage; // 失败消息存储接口public void sendWithRetry(Object message, String exchange, String routingKey) {// 1. 生成唯一消息IDString msgId = "MSG-" + System.currentTimeMillis() + "-" + UUID.randomUUID().toString().substring(0, 8);CorrelationData correlationData = new CorrelationData(msgId);// 2. 发送消息(携带CorrelationData)rabbitTemplate.convertAndSend(exchange, routingKey, message, correlationData);// 3. 注册确认回调(处理成功/失败)rabbitTemplate.setConfirmCallback((correlationData1, ack, cause) -> {String id = correlationData1 != null ? correlationData1.getId() : null;if (id == null) return;if (ack) {// 确认成功:删除存储中的失败记录(若有)failedMessageStorage.deleteById(id);System.out.println("消息确认成功,ID: " + id);} else {// 确认失败:存入失败消息表,等待重试FailedMessage failedMsg = new FailedMessage();failedMsg.setMsgId(id);failedMsg.setExchange(exchange);failedMsg.setRoutingKey(routingKey);failedMsg.setMessageBody(JSON.toJSONString(message)); // 序列化为JSONfailedMsg.setRetryCount(0);failedMsg.setMaxRetry(3);failedMsg.setNextRetryTime(System.currentTimeMillis() + 10 * 1000); // 10秒后重试failedMsg.setStatus("PENDING");failedMessageStorage.save(failedMsg);System.err.println("消息确认失败,存入重试队列,ID: " + id + ",原因: " + cause);}});}
}

4. 实现重试调度器(核心)

通过 定时任务(如 Spring Scheduler)或 线程池 定期检查失败消息表,对 next_retry_time ≤ 当前时间 的消息执行重试:

@Component
@EnableScheduling
public class RetryScheduler {@Autowiredprivate FailedMessageStorage failedMessageStorage;@Autowiredprivate RabbitTemplate rabbitTemplate;@Autowiredprivate DeadLetterQueueHandler deadLetterQueueHandler; // 死信队列处理器// 每5秒扫描一次待重试消息@Scheduled(fixedRate = 5000)public void retryFailedMessages() {List<FailedMessage> pendingMsgs = failedMessageStorage.queryPendingMessages(System.currentTimeMillis());for (FailedMessage msg : pendingMsgs) {try {// 1. 标记为“重试中”(避免并发重复处理)failedMessageStorage.updateStatus(msg.getMsgId(), "RETRYING");// 2. 反序列化消息体Object message = JSON.parseObject(msg.getMessageBody(), Object.class); // 根据实际类型强转// 3. 重新发送消息(携带原msgId)CorrelationData correlationData = new CorrelationData(msg.getMsgId());rabbitTemplate.convertAndSend(msg.getExchange(), msg.getRoutingKey(), message, correlationData);// 4. 更新重试次数和下次重试时间(指数退避:10s→20s→40s)int newRetryCount = msg.getRetryCount() + 1;long nextRetryInterval = (long) (10 * Math.pow(2, newRetryCount - 1)) * 1000; // 指数退避long nextRetryTime = System.currentTimeMillis() + nextRetryInterval;if (newRetryCount >= msg.getMaxRetry()) {// 超过最大重试次数:转入死信队列deadLetterQueueHandler.sendToDlx(msg);failedMessageStorage.updateStatus(msg.getMsgId(), "FAILED");System.err.println("消息重试次数耗尽,转入死信队列,ID: " + msg.getMsgId());} else {// 更新重试状态(次数+1,下次重试时间)failedMessageStorage.updateRetryInfo(msg.getMsgId(), newRetryCount, nextRetryTime, "PENDING");System.out.println("消息重试中,ID: " + msg.getMsgId() + ",第" + newRetryCount + "次");}} catch (Exception e) {// 重试过程中异常:恢复状态为PENDING,等待下次扫描failedMessageStorage.updateStatus(msg.getMsgId(), "PENDING");System.err.println("重试发送失败,ID: " + msg.getMsgId() + ",错误: " + e.getMessage());}}}
}

5. 死信队列兜底(重试失败的最终处理)

当消息超过最大重试次数仍未成功,将其转入死信队列(DLX),人工介入处理:

@Component
public class DeadLetterQueueHandler {@Autowiredprivate RabbitTemplate rabbitTemplate;public void sendToDlx(FailedMessage msg) {// 发送到死信交换机(需提前配置死信队列和交换机)rabbitTemplate.convertAndSend("dlx.exchange", "dlx.routingKey", msg.getMessageBody(),message -> {MessageProperties props = message.getMessageProperties();props.setHeader("failed_reason", "max_retry_exceeded");props.setHeader("original_msg_id", msg.getMsgId());return message;});}
}

三、关键技术点说明

1. 重试策略

  • 指数退避:重试间隔随次数递增(如 10s→20s→40s),避免集中重试压垮 Broker;
  • 固定间隔:简单场景用固定间隔(如每 30 秒重试一次);
  • 随机间隔:避免多个生产者同时重试导致“惊群效应”。

2. 幂等性保障

重试可能导致消息重复投递,消费者需通过 唯一ID去重(如 Redis 记录已处理 msg_id,有效期 24 小时):

@Component
public class IdempotentConsumer {@Autowiredprivate RedisTemplate<String, Object> redisTemplate;@RabbitListener(queues = "order.queue")public void consume(String message, @Header("msg_id") String msgId) {String key = "processed_msg:" + msgId;if (Boolean.TRUE.equals(redisTemplate.hasKey(key))) {// 已处理,直接确认channel.basicAck(deliveryTag, false);return;}// 业务逻辑处理...redisTemplate.opsForValue().set(key, "1", 24, TimeUnit.HOURS); // 记录已处理channel.basicAck(deliveryTag, false);}
}

3. 存储选型对比

存储方式 优点 缺点 适用场景
MySQL 持久化可靠,支持复杂查询 性能较低,需维护数据库连接 生产环境(消息重要性高)
Redis 高性能,支持过期时间 数据易失(需开启RDB/AOF持久化) 中小规模、对性能要求高的场景
内存队列 速度快 重启丢失消息 测试环境或临时重试

四、总结

生产者确认失败后的重试重新投递,本质是“存储-调度-重试-兜底”的闭环:

  1. 存储:用数据库/Redis 持久化失败消息;
  2. 调度:定时任务扫描待重试消息;
  3. 重试:按指数退避策略重新发送,更新重试状态;
  4. 兜底:超过次数后转入死信队列,人工介入。

通过这套机制,可确保消息在网络抖动、Broker 临时不可用等场景下仍能最终投递成功,同时避免无限重试的资源浪费。

http://www.zskr.cn/news/63533.html

相关文章:

  • 在 Electron 框架中完成数据库的连接、读取和写入
  • 2025年四川石膏板公司推荐:成都鑫瑞凯越建材有限公司领衔前十榜单
  • 2025 武汉高三一对一辅导学校权威推荐榜单!
  • 2025年航空发动机维修与正规原厂发动机生产厂家十大推荐
  • 2025年三大EA888奥迪发动机厂家排行榜,再制造EA21
  • 2025年十大发动机再制造品牌排行榜,乐辉再制造发动机性价比
  • 2025年知名的大型双层玻璃反应釜/50升双层玻璃反应釜厂家最新权威推荐排行榜
  • 20232404 2025-2026-1 《网络与系统攻防技术》实验七实验报告
  • 指针深入第二弹--字符指针、数组指针、函数指针、函数指针数组、转移表的理解加运用 - 实践
  • 2025年质量好的全自动旋转蒸发器厂家选购指南与推荐
  • 完整教程:Windows 系统中ffmpeg安装问题的彻底解决
  • 2025年知名的红色展厅设计专业公司推荐,专业红色文化展馆建
  • 2025年热门的真空发生器热门厂家推荐榜单
  • 2025年质量好的真空发生器最新TOP厂家排名
  • 除菌过滤技术哪家强?国内优质企业及选择指南
  • 2025年评价高的可调节三段力铰链/不锈钢三段力铰链厂家推荐及选购参考榜
  • 2025年热门的不锈钢三段力铰链品牌厂家排行榜
  • 2025年知名的预包装食品包装机厂家选购指南与推荐
  • ai论文网站推荐:智能化工具提升学术创作效率
  • 2025年比较好的ENF级除醛母婴板/家具母婴板TOP品牌厂家排行榜
  • 2025年质量好的环保家具板厂家最新TOP实力排行
  • 出镜口播的数字人视频工具推荐:高效创作助手盘点
  • 能播新闻的数字人视频工具:高效内容创作新选择
  • 2025noip游记
  • APEX实战第7篇:如何高效管理频繁更新的数据库对象
  • 2025年靠谱的螺杆空压机/螺杆式空压机厂家推荐及选购指南
  • 美国商标注册服务商怎么选?2025 五大平台测评,最稳选择一目了然
  • 2025年质量好的插拔柜内灯/橱柜柜内灯TOP品牌厂家排行榜
  • 美国商标注册公司哪家靠谱?2025 六大平台测评,新手不花冤枉钱
  • 2025福建省数据安全大赛决赛wp