当前位置: 首页 > news >正文

RabbitMQ投递回调机制以及策略业务补偿

————以点赞消息案例为例

一、关于RabbitMQ回调机制知识点补充: https://www.cnblogs.com/Mr-Keep/p/19140274

在 RabbitMQ 中,生产者发送消息后,有可能遇到以下几种情况:

  1. 消息成功投递到交换机(Exchange)

  2. 消息未能成功投递到交换机(Exchange)

  3. 消息成功进入交换机但无法路由到队列(Queue)

如果生产者端没有回调确认机制,就可能出现严重的数据不一致:

举例: Redis 已经增加点赞数,但消息并未真正进入 MQ,数据库后续也无法更新,就出现了 “缓存超前、数据库缺失” 的问题。

为了解决这种问题,Spring AMQP 提供了:

  • RabbitTemplate.setConfirmCallback()

  • RabbitTemplate.setReturnsCallback()

来捕获和处理消息投递的成功与失败。

但是在复杂系统中,不同的业务消息(例如“下单”、“扣库存”、“发积分”)在投递失败时,需要采取不同的补偿逻辑

弊端:如果你只写一份大而全的回调逻辑,代码就会充满大量的 if else 判断,非常难维护。

二、策略模式思想引入

策略模式的核心思想是:定义一系列算法(或行为),让它们可以相互替换,且算法的变化不会影响使用算法的客户。

  • “算法” ≈ “不同的消息回调处理逻辑”

  • “客户” ≈ “RabbitTemplate 的 ConfirmCallback 回调”

操作:通过(根据业务抽象)接口 + Map 注入,在运行时动态选择。

代码实现

1、定义统一的回调处理接口

public interface ConfirmCallbackService {/*** 投递失败后的回调处理* @param message 投递的消息对象*/void confirmCallback(Message message);
}

例:定义点赞案例的实现类(可选):

public class LikeConfirmCallback implements ConfirmCallbackService{/*** 注入RedisTemplate*/private final RedisTemplate<String,Integer> redisTemplate;/*** 执行失败后的反向操作* @param message 投递的消息对象*/@Overridepublic void confirmCallback(Message message) {byte[] bytes = message.getBody();//反向序列化为LikeDTO对象try {LikeDTO dto = new ObjectMapper().readValue(bytes, LikeDTO.class);if(dto.getLikeStatus()){redisTemplate.opsForSet().add(LikeEssayEnum.LIKE_ESSAY_PREFIX.getValue()+dto.getEid(), dto.getUid());}else{redisTemplate.opsForSet().remove(LikeEssayEnum.LIKE_ESSAY_PREFIX.getValue()+dto.getEid(),dto.getUid());}} catch (IOException e) {throw new RuntimeException(e);}}
}

小技巧:

  • 可选不单独定义类,而是让业务层本身实现ConfirmCallbackService接口,简化书写操作
  • 分离成策略类则更利于模块化、解耦和扩展。

2、回调上下文: 策略分发器

@Component
@RequiredArgsConstructor
@Slf4j
public class ConfirmCallbackContext {/*** 注入RabbitTemplate*/private final RabbitTemplate rabbitTemplate;/*** 注入所有ConfirmCallbackService的实现类* 在不同的业务场景调用不同的实现来处理投递失败的业务逻辑*/private final Map<String,ConfirmCallbackService> confirmCallbackServiceMap;/*** 统一调用回调处理* 在容器初始化就执行这个方法*/@PostConstructpublic void confirmCallback(){rabbitTemplate.setConfirmCallback((cdata,ack,cause)->{ReturnedMessage returnedMessage = cdata.getReturned();if(ack){log.info("The message was delivered to the{}",returnedMessage);}else{//获取业务实现的bean的idString beanName = returnedMessage.getReplyText();//根据bean的名称从map中获取相应的实现类ConfirmCallbackService callbackService = confirmCallbackServiceMap.get(beanName);callbackService.confirmCallback(returnedMessage.getMessage());}});}
}

核心原理:

  • Spring Boot 会自动扫描所有实现 ConfirmCallbackService 的 Bean

  • Bean 名称作为 key,Bean 实例作为 value 注入到 Map<String, ConfirmCallbackService>

  • ConfirmCallbackContext 根据 replyText 动态找到对应的策略实现类

3.消息发送端封装

@Component
@RequiredArgsConstructor
public class RabbitManager<T> {private final RabbitTemplate rabbitTemplate;public void send(String exchange,String routingKey,String callbackBeanName,T data){try {//创建cdata对象并设置一个idCorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());//将投递的数据转换为byte[]byte[] bytes = new ObjectMapper().writeValueAsBytes(data);//将bytes封装为Message对象Message message = new Message(bytes);//创建一个投递失败时返回的消息对象ReturnedMessage returnedMessage = new ReturnedMessage(message, 0,callbackBeanName, exchange,routingKey);//将ReturnedMesssage保存到cdata中correlationData.setReturned(returnedMessage);//发送rabbitTemplate.convertAndSend(exchange,routingKey,data,correlationData);} catch (Exception e) {throw new RuntimeException(e);}}
}

** 关键点:**
callbackBeanName 会被放进 replyText 中,作为“回调策略的指针”。

4.点赞业务逻辑方法
4.1简化写法

 @Overridepublic LikeDTO likeEssay(Integer uid, Integer eid) {boolean likeStatus = false;//如果缓存中存在用户id则取消点赞,不存在则添加用户id记录点赞if(isLike(eid, uid)) {//将用户ID从set集合中移除redisTemplate.opsForSet().remove(LikeEssayEnum.LIKE_ESSAY_PREFIX.getValue() + eid, uid);} else {likeStatus = true;//将用户ID添加到set集合中redisTemplate.opsForSet().add(LikeEssayEnum.LIKE_ESSAY_PREFIX.getValue() + eid, uid);}//获取当前帖子在redis中的点赞总数Long likeCount = redisTemplate.opsForSet().size(LikeEssayEnum.LIKE_ESSAY_PREFIX.getValue() + eid);//创建LikeDTO封装修改的数据并发布到消息队列LikeDTO likeDTO = new LikeDTO(eid, uid, likeCount,likeStatus);//发送到mq异步更新到数据库rabbitManager.send(RabbitmqConfig.EXCHANGE_NAME, RabbitmqConfig.ROUTING_KEY,"likeServiceImpl", likeDTO);return likeDTO;}/*** 消息投递失败后的处理* @param message 失败后返回的消息*/@Overridepublic void confirmCallback(Message message) {byte[] bytes = message.getBody();try {//反序列化为LikeDTO对象LikeDTO dto = new ObjectMapper().readValue(bytes, LikeDTO.class);//执行反向操作if(dto.getLikeStatus()) {redisTemplate.opsForSet().remove(LikeEssayEnum.LIKE_ESSAY_PREFIX.getValue() + dto.getEid(), dto.getUid());} else {redisTemplate.opsForSet().add(LikeEssayEnum.LIKE_ESSAY_PREFIX.getValue() + dto.getEid(), dto.getUid());}} catch (IOException e) {throw new RuntimeException(e);}}

4.2 有业务实现类时
````
public LikeDTO likeEssay(Integer uid, Integer eid) {
boolean likeStatus = false;

    //如果缓存中存在用户id则取消点赞,不存在则添加用户id记录点赞if(isLike(uid,eid)){//取消点赞redisTemplate.opsForSet().remove(LikeEssayEnum.LIKE_ESSAY_PREFIX.getValue()+eid,uid.toString());likeMapper.deleteLike(eid,uid);}else{likeStatus = true;//将用户ID添加到set集合中redisTemplate.opsForSet().add(LikeEssayEnum.LIKE_ESSAY_PREFIX.getValue()+eid,uid.toString());}//获取当前帖子在redis中的点赞总数Long likeCount = redisTemplate.opsForSet().size(LikeEssayEnum.LIKE_ESSAY_PREFIX.getValue() + eid);//创建LikeDTO封装修改的数据并发布到消息队列LikeDTO likeDTO = new LikeDTO(eid, uid, likeCount,likeStatus);//发送到mq异步更新到数据库rabbitManager.send(RabbitmqConfig.EXCHANGE_NAME,RabbitmqConfig.ROUTING_KEY,"likeConfirmCallbackService",likeDTO);return likeDTO;
}

最终目标:当点赞消息从生产者发送到 RabbitMQ 时,一旦投递失败,系统能自动执行反向补偿逻辑,确保 Redis 与数据库的一致性。
http://www.zskr.cn/news/21636.html

相关文章:

  • Cookie如何设置HTTPOnly和Secure 以防止XSS跨站脚本攻击
  • Vue中展示字符串,有换行符的怎么换行展示
  • 容器技术k8s - 指南
  • c语言之对齐函数代码示例
  • 什么是Barriers IO
  • MySQL中NULL值的5个反直觉行为,扭到了吗?
  • 2025 最新阳澄湖大闸蟹权威推荐排行榜,揭秘优质品牌的独特魅力大闸蟹蟹卡 / 大闸蟹礼盒 / 大闸蟹礼券 / 好蟹汇大闸蟹选择指南
  • 碳中和背景下的能源数字化:MyEMS 为企业提供精准碳管理方案
  • 桌面预测类编写,桌面%雷达,信号预测%系统构建,基于python,tk,scikit-learn机器学习算法实现,桌面预支持向量机分类算法,CSV无数据库
  • UEFI安装Windows 7 64位
  • MySQL,SqlServer,postgresql中,如何实现锁定一张表
  • 本地虚拟机windows 安装mysql数据库版本可选,外部如何连接
  • MLGO微算法科技创新AI技术:基于DRL的设备边缘协同推理优化系统,助力无线智能感知进入“极致高效”时代
  • C 语言 - fopen、fprintf、fread、fwrite、fputc 操作文件函数解析
  • 2025 碳晶板厂家最新推荐榜:含木纹 / 白色 / 全屋整装等品类,西南及全国优质厂家权威甄选指南
  • 把列表中所有姓周的人的信息删掉(此题有坑, 请慎重): lst = [周⽼⼆, 周星星, 麻花藤, 周扒⽪] # 结果: lst = [麻花藤]
  • 2022 ICPC 香港 L
  • AI 姓氏头像生成小程序管理系统:专属头像定制与流量变现解决方案
  • 2025年锅炉厂家最新权威推荐榜:工业锅炉、燃气锅炉、电热锅炉、蒸汽锅炉专业制造商实力解析与选购指南
  • CCSP2025 游记
  • 2025 年国内算力云公司最新推荐排行榜:聚焦 AI 训推与 MaaS 服务,助力企业精准选优质合作伙伴算力云大模型/算力云深度学习/微调算力云/蓝耘算力云公司推荐
  • 2025年清洗机厂家最新权威推荐榜:高压清洗机、工业清洗机、超声波清洗机源头厂家综合实力解析
  • 2025年微弧氧化厂家最新推荐排行榜,微弧氧化/铝合金微弧氧化/镁合金微弧氧化/黑色微弧氧化/钛合金微弧氧化/微弧氧化技术加工,渔具/雷达/医疗器械微弧氧化专业供应商
  • 2025年武汉防水公司最新权威推荐榜:商铺装修防水,别墅补漏防水,厂房防潮防水专业施工与长效保障口碑之选
  • Gephi 0.9.2超星系保姆级下载安装教程及配置教程(新手适用,附安装包下载)
  • Perforce:Stream实战指南
  • 2025年立式扒胎机厂家最新权威推荐榜:专业设备批发与高效服务口碑之选
  • Python基础入门:从环境搭建到基础运算
  • 洛谷题单指南-进阶数论-CF757B Bashs Big Day
  • 2025年无心/外圆磨床、滚丝机、外圆抛光机、无心/外圆磨床送料机/送料架/自动化/机械手厂家最新权威推荐排行榜