分布式事务
2026/1/15大约 4 分钟
分布式事务
为什么需要分布式事务
微服务架构下,一个业务操作可能涉及多个服务,需要保证数据一致性。
2PC 两阶段提交
角色
- 协调者:事务管理器
- 参与者:资源管理器
流程
问题
- 同步阻塞:参与者等待协调者指令
- 单点故障:协调者故障导致参与者阻塞
- 数据不一致:部分参与者提交失败
3PC 三阶段提交
在 2PC 基础上增加超时机制和预提交阶段。
流程
- CanCommit:询问是否可以提交
- PreCommit:预提交
- DoCommit:正式提交
改进
- 参与者超时后自动提交
- 减少阻塞时间
TCC
Try-Confirm-Cancel,业务层面的分布式事务。
三个阶段
| 阶段 | 说明 |
|---|---|
| Try | 预留资源 |
| Confirm | 确认提交 |
| Cancel | 取消回滚 |
示例
// 订单服务
public interface OrderService {
// Try:创建订单,状态为待确认
void tryCreate(Order order);
// Confirm:确认订单
void confirmCreate(Order order);
// Cancel:取消订单
void cancelCreate(Order order);
}
// 库存服务
public interface StockService {
// Try:冻结库存
void tryDeduct(String productId, int count);
// Confirm:扣减库存
void confirmDeduct(String productId, int count);
// Cancel:解冻库存
void cancelDeduct(String productId, int count);
}问题
- 空回滚:Try 未执行,Cancel 被调用
- 悬挂:Cancel 先于 Try 执行
- 幂等性:Confirm/Cancel 可能重复调用
解决方案
// 使用事务状态表
CREATE TABLE tcc_transaction (
xid VARCHAR(64) PRIMARY KEY,
status VARCHAR(20), -- TRYING, CONFIRMED, CANCELLED
create_time DATETIME
);
// Cancel 时检查状态
public void cancelDeduct(String xid, String productId, int count) {
TccTransaction tx = transactionDao.findByXid(xid);
if (tx == null) {
// 空回滚,记录状态防止悬挂
transactionDao.insert(new TccTransaction(xid, "CANCELLED"));
return;
}
if ("CANCELLED".equals(tx.getStatus())) {
// 幂等,已取消
return;
}
// 执行取消逻辑
}Saga
长事务拆分为多个本地事务,每个事务有对应的补偿操作。
两种模式
编排模式:中央协调器控制流程
编舞模式:服务之间通过事件通信
示例
// 正向操作
createOrder() -> deductStock() -> deductBalance() -> addPoints()
// 补偿操作(逆序)
cancelPoints() -> refundBalance() -> restoreStock() -> cancelOrder()本地消息表
通过消息队列实现最终一致性。
流程
实现
// 服务A
@Transactional
public void createOrder(Order order) {
// 1. 创建订单
orderDao.insert(order);
// 2. 写入消息表
Message message = new Message("ORDER_CREATED", order.getId());
messageDao.insert(message);
}
// 定时任务发送消息
@Scheduled(fixedRate = 1000)
public void sendMessages() {
List<Message> messages = messageDao.findPending();
for (Message message : messages) {
try {
mqTemplate.send(message);
messageDao.updateStatus(message.getId(), "SENT");
} catch (Exception e) {
// 重试
}
}
}
// 服务B
@RabbitListener(queues = "order.created")
public void handleOrderCreated(Order order) {
// 幂等处理
if (stockService.isProcessed(order.getId())) {
return;
}
stockService.deduct(order.getProductId(), order.getCount());
}RocketMQ 事务消息
// 发送事务消息
TransactionSendResult result = producer.sendMessageInTransaction(message, new LocalTransactionExecuter() {
@Override
public LocalTransactionState executeLocalTransactionBranch(Message msg, Object arg) {
try {
// 执行本地事务
orderService.createOrder(order);
return LocalTransactionState.COMMIT_MESSAGE;
} catch (Exception e) {
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
}, null);
// 事务状态回查
public class TransactionListenerImpl implements TransactionListener {
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
// 查询本地事务状态
Order order = orderService.findById(msg.getKeys());
if (order != null) {
return LocalTransactionState.COMMIT_MESSAGE;
}
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}Seata
阿里开源的分布式事务框架。
模式
| 模式 | 说明 | 适用场景 |
|---|---|---|
| AT | 自动补偿 | 大多数场景 |
| TCC | 手动补偿 | 高性能要求 |
| Saga | 长事务 | 业务流程长 |
| XA | 强一致性 | 传统数据库 |
AT 模式
@GlobalTransactional
public void purchase(String userId, String productId, int count) {
// 扣减库存
stockService.deduct(productId, count);
// 创建订单
orderService.create(userId, productId, count);
// 扣减余额
accountService.deduct(userId, totalAmount);
}原理
- 解析 SQL,记录前后镜像
- 生成回滚日志
- 提交本地事务
- 全局提交:删除回滚日志
- 全局回滚:根据回滚日志恢复数据
方案对比
| 方案 | 一致性 | 性能 | 复杂度 | 适用场景 |
|---|---|---|---|---|
| 2PC | 强 | 低 | 低 | 传统数据库 |
| TCC | 最终 | 高 | 高 | 高性能要求 |
| Saga | 最终 | 中 | 中 | 长事务 |
| 本地消息表 | 最终 | 高 | 中 | 异步场景 |
| 事务消息 | 最终 | 高 | 低 | 消息驱动 |