记录分布式事务的实现方式和用法(有借助AI)

记录分布式事务的实现方式和用法(有借助AI)

分布式事务 — 种类与 Java 实现


一、为什么需要分布式事务

单体架构中,事务靠数据库 ACID 保证:

BEGIN;UPDATEaccountSETbalance=balance-100WHEREid=1;UPDATEaccountSETbalance=balance+100WHEREid=2;COMMIT;

但在微服务架构下,一个业务操作跨多个服务、多个数据库:

订单服务 (MySQL) → 扣库存 ↓ 账户服务 (MySQL) → 扣余额 ↓ 物流服务 (PostgreSQL) → 创建运单 ❌ 没有分布式事务 → 库存扣了但余额没扣 → 灾难

二、理论基石

2.1 CAP 定理

一个分布式系统最多同时满足两个: C (Consistency) 一致性 — 所有节点同一时刻数据相同 A (Availability) 可用性 — 每个请求都能获得响应 P (Partition) 分区容错 — 网络分区时系统仍能工作 现实:P 不可回避 / \ 选 CP 选 AP (强一致+可能不可用) (最终一致+高可用) 分布式事务中,大多数方案选择 AP(最终一致性)

2.2 BASE 理论

BA (Basically Available) 基本可用 — 允许短暂不可用 S (Soft State) 软状态 — 允许中间状态 E (Eventually Consistent) 最终一致 — 不要求实时一致 BASE 是分布式事务设计的核心指导思想

三、分布式事务方案总览

方案一致性性能复杂度Java 实现
XA / 2PC强一致Atomikos, Narayana
TCC强一致Seata TCC, 自研
AT 模式最终一致Seata AT
Saga最终一致Seata Saga, 自研
本地消息表最终一致自研 + MQ
可靠消息最终一致RocketMQ 事务消息
最大努力通知最终一致MQ + 重试

四、方案详解 + Java 实现

4.1 XA / 2PC(两阶段提交)

原理

TM(事务管理器) │ ┌──────┼──────┐ ▼ ▼ ▼ RM1 RM2 RM3 (资源管理器 = 数据库) 阶段一(Prepare):TM 问所有 RM "准备好了吗?" 阶段二(Commit): 所有 RM 说 Yes → TM 发出 Commit 任一 RM 说 No → TM 发出 Rollback

Java 实现 — Atomikos

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-jta-atomikos</artifactId></dependency>
# application.ymlspring:jta:atomikos:datasource:order:xa-data-source-class-name:com.mysql.cj.jdbc.MysqlXADataSourcexa-properties:url:jdbc:mysql://localhost:3306/order_dbuser:rootpassword:rootaccount:xa-data-source-class-name:com.mysql.cj.jdbc.MysqlXADataSourcexa-properties:url:jdbc:mysql://localhost:3306/account_dbuser:rootpassword:root
@ServicepublicclassOrderService{@AutowiredprivateJdbcTemplateorderJdbc;@AutowiredprivateJdbcTemplateaccountJdbc;@Transactional// Spring 自动使用 JTA 事务管理器publicvoidcreateOrder(LonguserId,LongproductId,BigDecimalamount){// 两个数据库操作在同一个 XA 事务中orderJdbc.update("INSERT INTO orders(user_id, product_id, amount) VALUES(?,?,?)",userId,productId,amount);accountJdbc.update("UPDATE account SET balance = balance - ? WHERE user_id = ?",amount,userId);}}

优缺点

✅ 优点❌ 缺点
强一致性性能差(锁资源时间长)
数据库原生支持单点故障(TM 崩溃全部阻塞)
简单透明不适合高并发

4.2 Seata AT 模式(推荐入门)

原理:自动拦截 SQL → 记录 undo_log → 提交本地事务 → 异步全局提交/回滚

阶段一:每个服务执行自己的 SQL,同时记录 undo_log 阶段二:TC(事务协调器)决定全局提交或回滚 提交 → 异步删除 undo_log 回滚 → 根据 undo_log 反向补偿

Java 实现 — Seata AT

<dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-alibaba-seata</artifactId></dependency>
seata:tx-service-group:my_tx_groupservice:vgroup-mapping:my_tx_group:defaultgrouplist:default:127.0.0.1:8091
@ServicepublicclassOrderService{@AutowiredprivateOrderMapperorderMapper;@AutowiredprivateAccountFeignClientaccountClient;@GlobalTransactional(name="create-order",rollbackFor=Exception.class)publicvoidcreateOrder(Orderorder){// ① 本地:创建订单orderMapper.insert(order);// ② 远程:扣余额(Feign 调用账户服务)accountClient.debit(order.getUserId(),order.getAmount());// 任一失败 → Seata 自动回滚:删订单 + 补余额}}
-- Seata 需要在每个业务数据库添加 undo_log 表CREATETABLEundo_log(idBIGINTAUTO_INCREMENTPRIMARYKEY,branch_idBIGINTNOTNULL,xidVARCHAR(128)NOTNULL,contextVARCHAR(128)NOTNULL,rollback_infoLONGBLOBNOTNULL,log_statusINTNOTNULL,log_createdDATETIMENOTNULL,log_modifiedDATETIMENOTNULL,UNIQUEKEYux_undo_log(xid,branch_id));

4.3 TCC(Try-Confirm-Cancel)

原理:把每个操作拆成三个接口,业务方自己实现

Try — 资源预留(冻结库存) Confirm — 资源确认(扣库存) Cancel — 资源释放(恢复库存)

Java 实现 — Seata TCC

@LocalTCCpublicinterfaceInventoryTccAction{/** * Try: 冻结库存 */@TwoPhaseBusinessAction(name="inventory-action",commitMethod="confirm",rollbackMethod="cancel")booleantryDeduct(@BusinessActionContextParameter(paramName="productId")LongproductId,@BusinessActionContextParameter(paramName="count")intcount);/** * Confirm: 确认扣减 */booleanconfirm(BusinessActionContextctx);/** * Cancel: 回滚恢复 */booleancancel(BusinessActionContextctx);}@ServicepublicclassInventoryTccActionImplimplementsInventoryTccAction{@AutowiredprivateInventoryMapperinventoryMapper;@OverridepublicbooleantryDeduct(LongproductId,intcount){// 冻结库存:stock → frozen_stockreturninventoryMapper.freeze(productId,count)>0;}@Overridepublicbooleanconfirm(BusinessActionContextctx){LongproductId=Long.valueOf(ctx.getActionContext("productId").toString());intcount=(int)ctx.getActionContext("count");// 真正扣减:frozen_stock 清零returninventoryMapper.confirmDeduct(productId,count)>0;}@Overridepublicbooleancancel(BusinessActionContextctx){// 恢复库存:frozen_stock → stockreturninventoryMapper.rollback(ctx);}}

优缺点

✅ 优点❌ 缺点
性能好(不锁资源)代码侵入性强
强一致性需要业务方实现三个接口
适合资金交易空回滚、悬挂等异常要处理

4.4 Saga 模式(长事务首选)

原理:把长事务拆成多个有序的本地事务,每个事务有对应的补偿操作

正向:T1 → T2 → T3 → T4 补偿:C1 ← C2 ← C3 (任一失败,从失败点往前补偿) Saga 状态机: [T1] ──成功──→ [T2] ──成功──→ [T3] ──成功──→ 完成 │ │ │ 失败 失败 失败 │ │ │ ▼ ▼ ▼ 结束 [C1]←──────────────┘ │ ▼ 结束

Java 实现 — Seata Saga(状态机 DSL)

{"Name":"create-order-saga","StartState":"CreateOrder","States":{"CreateOrder":{"Type":"ServiceTask","ServiceName":"orderService.create","Next":"DeductInventory","CompensateState":"CancelOrder"},"DeductInventory":{"Type":"ServiceTask","ServiceName":"inventoryService.deduct","Next":"DebitAccount","CompensateState":"RestoreInventory"},"DebitAccount":{"Type":"ServiceTask","ServiceName":"accountService.debit","Next":"Succeed","CompensateState":"RefundAccount"},"Succeed":{"Type":"Succeed"}}}

优缺点

✅ 优点❌ 缺点
适合长事务(分钟/天级)无隔离性(中间状态可能被读到)
高性能补偿逻辑不是总能实现
可编排调试困难

4.5 本地消息表 + MQ(最常用自研方案)

原理

服务 A 的业务操作 + 消息表在同一个本地事务中 ↓ 定时任务扫描消息表 → 发 MQ ↓ 服务 B 消费 MQ → 执行自己的操作 ↓ 消费成功后更新消息状态 / 失败重试

Java 实现

-- 消息表CREATETABLEtransactional_message(idBIGINTAUTO_INCREMENTPRIMARYKEY,business_idVARCHAR(64)NOTNULL,topicVARCHAR(128)NOTNULL,tagVARCHAR(128),message_keyVARCHAR(128),bodyTEXTNOTNULL,statusVARCHAR(20)NOTNULLDEFAULT'PENDING',-- PENDING → SENT → CONSUMEDretry_countINTDEFAULT0,max_retryINTDEFAULT10,next_retryDATETIME,created_atDATETIMEDEFAULTNOW(),updated_atDATETIMEDEFAULTNOW());
@ServicepublicclassOrderService{@AutowiredprivateOrderMapperorderMapper;@AutowiredprivateMessageMappermessageMapper;@AutowiredprivateRocketMQTemplaterocketMQTemplate;@TransactionalpublicvoidcreateOrder(Orderorder){// ① 业务操作orderMapper.insert(order);// ② 消息记录(同一个本地事务)Messagemsg=Message.builder().businessId(order.getId().toString()).topic("ORDER_TOPIC").tag("ORDER_CREATED").body(JSON.toJSONString(order)).status("PENDING").build();messageMapper.insert(msg);}// ③ 定时任务:扫描待发送消息@Scheduled(fixedDelay=5000)publicvoidsendPendingMessages(){List<Message>messages=messageMapper.selectPending();for(Messagemsg:messages){try{rocketMQTemplate.convertAndSend(msg.getTopic()+":"+msg.getTag(),msg.getBody());messageMapper.updateStatus(msg.getId(),"SENT");}catch(Exceptione){// 失败重试 + 达到上限告警messageMapper.incrementRetry(msg.getId());}}}}

4.6 RocketMQ 事务消息

@ServicepublicclassOrderService{@AutowiredprivateRocketMQTemplaterocketMQTemplate;publicvoidcreateOrder(Orderorder){// RocketMQ 事务消息 — 天然支持分布式事务rocketMQTemplate.sendMessageInTransaction("order-tx-producer-group","ORDER_TOPIC:ORDER_CREATED",MessageBuilder.withPayload(JSON.toJSONString(order)).build(),order.getId()// 业务参数,executeLocalTransaction 中可用);}// 本地事务执行@RocketMQTransactionListener(txProducerGroup="order-tx-producer-group")publicclassOrderTxListenerimplementsRocketMQLocalTransactionListener{@OverridepublicRocketMQLocalTransactionStateexecuteLocalTransaction(org.springframework.messaging.Messagemsg,Objectarg){try{// 本地事务:创建订单 + 扣库存orderMapper.insert(order);returnRocketMQLocalTransactionState.COMMIT;}catch(Exceptione){returnRocketMQLocalTransactionState.ROLLBACK;}}@OverridepublicRocketMQLocalTransactionStatecheckLocalTransaction(org.springframework.messaging.Messagemsg){// MQ 回查:确认本地事务最终状态LongorderId=(Long)msg.getHeaders().get("orderId");Orderorder=orderMapper.selectById(orderId);returnorder!=null?RocketMQLocalTransactionState.COMMIT:RocketMQLocalTransactionState.ROLLBACK;}}}

五、方案选择决策树

需要强一致性? ├─ 是 → 并发低? → XA/2PC │ 并发高? → TCC(资源预留模式) │ └─ 否 → 最终一致即可 ├─ 短事务(<1分钟)→ Seata AT / RocketMQ 事务消息 ├─ 长事务(分钟~天)→ Saga └─ 链路长/不确定?→ 本地消息表 + MQ

六、方案对比速查表

方案一致性性能代码侵入隔离性适用场景
XA/2PC⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐(透明)⭐⭐⭐⭐⭐低并发资金交易
Seata AT⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐(透明)⭐⭐⭐通用微服务(推荐)
TCC⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐高并发资金/库存
Saga⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐长流程(预订/审批)
本地消息表⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐异步解耦场景
事务消息⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐MQ 生态刚需

七、总结

选型口诀: 短事务用 AT 省心省力 高并发用 TCC 精准控制 长流程用 Saga 补偿编排 异步解耦用本地消息表 / RocketMQ 永远不要在生产环境用 XA,除非你知道自己在干什么 永远要有监控和告警,所有最终一致方案都可能出现不一致