Spring Boot 实现分布式事务的优雅方案
原文:本地消息表:Spring Boot 实现分布式事务的优雅方案-51CTO.COM
前言
在微服务架构中,分布式事务一直是一个棘手的问题,常见的分布式事务解决方案包括:
2PC(两阶段提交)
2PC即两阶段提交协议,是将整个事务流程分为两个阶段,准备阶段(Prepare phase)、提交阶段(commit phase):
- 准备阶段(Prepare phase):事务管理器给每个参与者发送Prepare消息,每个数据库参与者在本地执行事务,并写本地的Undo/Redo日志,此时事务没有提交。 (Undo日志是记录修改前的数据,用于数据库回滚,Redo日志是记录修改后的数据,用于提交事务后写入数据文件)
- 提交阶段(commit phase):如果事务管理器收到了参与者的执行失败或者超时消息时,直接给每个参与者发送回滚(Rollback)消息;否则,发送提交(Commit)消息;参与者根据事务管理器的指令执行提交或者回滚操作,并释放事务处理过程中使用的锁资源。注意:必须在最后阶段释放锁资源。
当所有参与者均反馈yes,提交事务
图片
当任何阶段1一个参与者反馈no,中断事务
图片
TCC(Try-Confirm-Cancel)
TCC要求每个分支事务实现三个操作:预处理Try、确认Confirm、撤销Cancel。Try操作做业务检查及资源预留,Confirm做业务确认操作,Cancel实现一个与Try相反的操作即回滚操作。TM首先发起所有的分支事务的try操作,任何一个分支事务的try操作执行失败,TM将会发起所有分支事务的Cancel操作,若try操作全部成功,TM将会发起所有分支事务的Confirm操作,其中Confirm/Cancel操作若执行失败,TM会进行重试。
- Try阶段是做业务检查(一致性)及资源预留(隔离),此阶段仅是一个初步操作,它和后续的Confirm一起才能真正构成一个完整的业务逻辑。
- Confirm阶段是做确认提交,Try阶段所有分支事务执行成功后开始执行Confirm。通常情况下,采用TCC则认为Confirm阶段是不会出错的。即:只要Try成功,Confirm一定成功。若Confirm阶段真的出错了,需引入重试机制或人工处理。
- Cancel阶段是在业务执行错误需要回滚的状态下执行分支事务的业务取消,预留资源释放。通常情况下,采用TCC则认为Cancel阶段也是一定成功的。若Cancel阶段真的出错了,需引入重试机制或人工处理。
- TM事务管理器可以实现为独立的服务,也可以让全局事务发起方充当TM的角色,TM独立出来是为了成为公用组件,是为了考虑系统结构和软件复用。
当Try阶段服务全部正常执行, 执行确认业务逻辑操作
图片
当Try阶段存在服务执行失败, 进入Cancel阶段
图片
可靠消息最终一致性
可靠消息最终一致性方案是指当事务发起方执行完成本地事务后并发出一条消息,事务参与方(消息消费者)一定能够接收消息并处理事务成功,此方案强调的是只要消息发给事务参与方最终事务要达到一致。
正常情况——事务主动方发消息
图片
异常情况——事务主动方消息恢复
图片
本地消息表
本地消息表这个方案最初是eBay提出的,此方案的核心是通过本地事务保证数据业务操作和消息的一致性,然后通过定时任务将消息发送至消息中间件,待确认消息发送给消费方成功再将消息删除。
本地消息表的核心思想是:将分布式事务拆分为多个本地事务,并通过消息表记录事务状态。具体流程如下:
- 业务操作与消息记录:在同一个本地事务中,执行业务操作并记录消息到本地数据库的消息表中
- 消息发送:事务提交后,通过定时任务或事件监听机制,将消息发送到消息队列
- 消息消费:下游服务从消息队列消费消息并执行业务操作
- 消息确认:下游服务处理完成后,通过某种机制确认消息已处理
图片
这种方案的最大优势是将分布式事务转换为本地事务,利用数据库的ACID特性保证业务操作和消息记录的原子性。
实现步骤
本地消息表因其实现简单、可靠性高、性能良好,成为中小型项目的首选方案,本文将详细介绍本地消息表的原理,并结合Spring Boot提供完整的实现方案。
数据库设计
-- 消息表结构
CREATE TABLE message (
id VARCHAR(32) PRIMARY KEY,
content TEXT NOT NULL,
topic VARCHAR(100) NOT NULL,
status VARCHAR(20) NOT NULL COMMENT 'INIT, SENDING, SENT, FAILED',
retry_count INT DEFAULT 0,
next_retry_time DATETIME,
create_time DATETIME NOT NULL,
update_time DATETIME NOT NULL,
INDEX idx_status (status),
INDEX idx_next_retry_time (next_retry_time)
);
-- 业务表示例(如订单表)
CREATE TABLE orders (
id VARCHAR(32) PRIMARY KEY,
user_id VARCHAR(32) NOT NULL,
amount DECIMAL(10, 2) NOT NULL,
status VARCHAR(20) NOT NULL,
create_time DATETIME NOT NULL,
update_time DATETIME NOT NULL
);
消息结构
public enum MessageStatus {
INIT("初始化"),
SENDING("发送中"),
SENT("已发送"),
FAILED("发送失败");
private final String description;
MessageStatus(String description) {
this.description = description;
}
public String getDescription() {
return description;
}
}
public interface MessageRepository extends JpaRepository<Message, String> {
List<Message> findByStatusAndNextRetryTimeLessThanEqual(String status, LocalDateTime now);
@Transactional
@Modifying
@Query("UPDATE Message m SET m.status = ?2, m.updateTime = ?3 WHERE m.id = ?1")
int updateStatus(String id, String status, LocalDateTime updateTime);
@Transactional
@Modifying
@Query("UPDATE Message m SET m.status = ?2, m.retryCount = m.retryCount + 1, " +
"m.nextRetryTime = ?3, m.updateTime = ?3 WHERE m.id = ?1")
int updateForRetry(String id, String status, LocalDateTime nextRetryTime);
@Transactional
@Modifying
@Query("DELETE FROM Message m WHERE m.createTime < ?1")
int deleteOlderThan(LocalDateTime threshold);
}
- MessageStatus枚举:定义消息的四种状态
- MessageRepository接口:继承JPA的JpaRepository,提供基本CRUD操作,并定义自定义查询方法
消息服务实现
@Service
public class MessageServiceImpl implements MessageService {
@Autowired
private MessageRepository messageRepository;
@Autowired
private RabbitTemplate rabbitTemplate;
@Value("${message.retry.interval:60000}")
private long retryInterval;
@Value("${message.max.retry:10}")
private int maxRetry;
@Value("${message.retention.days:30}")
private int retentionDays;
@Override
@Transactional
public Message saveMessage(String content, String topic) {
Message message = new Message();
message.setId(UUID.randomUUID().toString().replace("-", ""));
message.setContent(content);
message.setTopic(topic);
message.setStatus(MessageStatus.INIT.name());
message.setCreateTime(LocalDateTime.now());
message.setUpdateTime(LocalDateTime.now());
return messageRepository.save(message);
}
@Override
public void sendMessage(Message message) {
try {
// 更新消息状态为发送中
messageRepository.updateStatus(message.getId(), MessageStatus.SENDING.name(), LocalDateTime.now());
// 发送消息到RabbitMQ
rabbitTemplate.convertAndSend(message.getTopic(), message.getContent());
// 更新消息状态为已发送
messageRepository.updateStatus(message.getId(), MessageStatus.SENT.name(), LocalDateTime.now());
} catch (Exception e) {
// 发送失败,更新重试信息
handleSendFailure(message, e);
}
}
@Override
public void processPendingMessages() {
List<Message> pendingMessages = messageRepository.findByStatusAndNextRetryTimeLessThanEqual(
MessageStatus.INIT, LocalDateTime.now());
for (Message message : pendingMessages) {
sendMessage(message);
}
}
@Override
public void retryFailedMessages() {
List<Message> failedMessages = messageRepository.findByStatusAndNextRetryTimeLessThanEqual(
MessageStatus.FAILED, LocalDateTime.now());
for (Message message : failedMessages) {
if (message.getRetryCount() < maxRetry) {
try {
// 更新消息状态为发送中
messageRepository.updateStatus(message.getId(), MessageStatus.SENDING.name(), LocalDateTime.now());
// 重试发送消息
rabbitTemplate.convertAndSend(message.getTopic(), message.getContent());
// 更新消息状态为已发送
messageRepository.updateStatus(message.getId(), MessageStatus.SENT.name(), LocalDateTime.now());
} catch (Exception e) {
// 重试失败,更新重试信息
handleSendFailure(message, e);
}
} else {
// 超过最大重试次数,记录日志并标记为永久失败
// 可以添加告警机制
System.err.println("Message exceeded max retry count: " + message.getId());
}
}
}
@Override
public void cleanOldMessages() {
LocalDateTime threshold = LocalDateTime.now().minusDays(retentionDays);
int deletedCount = messageRepository.deleteOlderThan(threshold);
System.out.println("Deleted " + deletedCount + " old messages");
}
@Override
public void updateMessageStatus(String messageId, MessageStatus status) {
messageRepository.updateStatus(messageId, status.name(), LocalDateTime.now());
}
private void handleSendFailure(Message message, Exception e) {
// 计算下一次重试时间
LocalDateTime nextRetryTime = LocalDateTime.now().plusSeconds(retryInterval);
// 更新消息状态为失败并增加重试次数
messageRepository.updateForRetry(
message.getId(),
MessageStatus.FAILED.name(),
nextRetryTime
);
// 记录错误日志
System.err.println("Failed to send message: " + message.getId() + ", error: " + e.getMessage());
}
}
- saveMessage:创建并保存新消息,设置初始状态为INIT
- sendMessage:发送消息到消息队列,处理发送成功和失败的情况
- processPendingMessages:处理待发送的消息
- retryFailedMessages:重试发送失败的消息,实现最大重试次数控制
- cleanOldMessages:清理过期消息,防止消息表过大
- handleSendFailure:处理发送失败的情况,更新重试信息
定时任务配置
@Component
public class MessageScheduler {
@Autowired
private MessageService messageService;
/**
* 定时处理待发送的消息
*/
@Scheduled(fixedRate = 10000) // 每10秒执行一次
public void processPendingMessages() {
messageService.processPendingMessages();
}
/**
* 定时重试失败的消息
*/
@Scheduled(fixedRate = 30000) // 每30秒执行一次
public void retryFailedMessages() {
messageService.retryFailedMessages();
}
/**
* 定时清理过期消息
*/
@Scheduled(fixedRateString = "${message.clean.interval}")
public void cleanOldMessages() {
messageService.cleanOldMessages();
}
}
- processPendingMessages:每 10 秒检查一次待发送的消息
- retryFailedMessages:每 30 秒检查一次需要重试的消息
- cleanOldMessages:按配置的间隔清理过期消息
业务逻辑
@Service
public class OrderServiceImpl implements OrderService {
@Autowired
private OrderRepository orderRepository;
@Autowired
private MessageService messageService;
@Override
@Transactional
public Order createOrder(String userId, Double amount) {
// 创建订单
Order order = new Order();
order.setId(UUID.randomUUID().toString().replace("-", ""));
order.setUserId(userId);
order.setAmount(amount);
order.setStatus("CREATED");
order.setCreateTime(LocalDateTime.now());
order.setUpdateTime(LocalDateTime.now());
// 保存订单
orderRepository.save(order);
// 在同一事务中保存消息(关键:确保订单创建和消息记录在同一个本地事务中)
String messageContent = String.format("{\"orderId\":\"%s\",\"userId\":\"%s\",\"amount\":%.2f}",
order.getId(), userId, amount);
messageService.saveMessage(messageContent, "order.created");
return order;
}
}
消息消费处理
@Component
public class OrderCreatedConsumer {
@Autowired
private PaymentService paymentService;
@Autowired
private MessageService messageService;
@Autowired
private ObjectMapper objectMapper;
@RabbitListener(queues = "order.created")
public void handleOrderCreated(String message) {
try {
// 解析消息内容
Map<String, Object> messageData = objectMapper.readValue(message, Map.class);
String orderId = (String) messageData.get("orderId");
String userId = (String) messageData.get("userId");
Double amount = Double.parseDouble(messageData.get("amount").toString());
// 处理订单创建事件(例如:创建支付记录)
paymentService.createPayment(orderId, userId, amount);
// 可以在这里添加其他业务逻辑,如扣减库存、发送通知等
} catch (Exception e) {
// 消费失败,记录日志
System.err.println("Failed to process order created message: " + e.getMessage());
// 注意:RabbitMQ默认会重试,可能导致消息重复消费,需要在业务层处理幂等性
}
}
}
@Service
public class PaymentServiceImpl implements PaymentService {
@Override
@Transactional
public void createPayment(String orderId, String userId, Double amount) {
// 实现支付逻辑
// 例如:创建支付记录、调用支付网关、更新订单状态等
System.out.println("Creating payment for order: " + orderId + ", amount: " + amount);
// 这里只是示例,实际应用中需要根据业务需求实现具体逻辑
}
}
- @RabbitListener注解:监听指定队列的消息
- 业务处理:根据消息内容执行业务逻辑
幂等性设计:消费端需要确保业务操作的幂等性,防止重复消费导致的数据不一致。
解决接口幂等问题,只需要记住一句口令”一锁、二判、三更新”,只要严格遵守这个过程,那么就可以解决并发问题。
//一锁:先加一个分布式锁
@DistributeLock(scene = "OEDER", keyExpression = "#request.identifier", expire = 3000)
public OrderResponse apply(OrderRequest request) {
OrderResponse response = new OrderResponse();
//二判:判断请求是否执行成功过
OrderDTO orderDTO = orderService.queryOrder(request.getProduct(), request.getIdentifier());
if (orderDTO != null) {
response.setSuccess(true);
response.setResponseCode("DUPLICATED");
return response;
}
//三更新:执行更新的业务逻辑
return orderService.order(request);
}
总结
本地消息表方案的核心在于:
- 将业务操作和消息记录放在同一个本地事务中,确保原子性
- 通过定时任务异步发送消息,避免阻塞业务流程
- 实现消息重试机制,提高消息发送成功率
- 消费端实现幂等性,保证数据一致性