命运交织的节点:分布式事务最终一致性的心跳共鸣纪实

本文主要是介绍命运交织的节点:分布式事务最终一致性的心跳共鸣纪实,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

关注微信公众号 “程序员小胖” 每日技术干货,第一时间送达!

引言

在当今云计算和微服务架构大行其道的时代,分布式系统成为了构建高可用、高性能应用的基石。然而,随着系统规模的扩张,数据的一致性问题如同幽灵般萦绕在每位架构师心头,尤其是分布式事务处理中的挑战更是首当其冲。今天,让我们一起深入探索分布式事务模型中的“最终一致性”,揭开它那既神秘又强大的面纱。

分布式事务的挑战与背景

想象一下双十一购物节,数百万用户同时下单,订单系统、库存系统、支付系统等多个服务间需要协同完成交易。采用最终一致性模型,即使瞬间请求激增导致部分操作延迟,系统也能确保在合理的时间框架内调整库存、确认订单状态,从而维持整体业务流程的顺畅。

最终一致性?

在分布式系统中,最终一致性是一种事务模型,它保证系统中的所有数据副本最终会达到一致的状态,但不保证立即的一致性。这种模型允许在数据复制过程中存在短暂的不一致状态,但随着时间的推移,系统会通过各种机制确保数据最终达到一致。

实现策略

补偿事务(TCC)

TCC,即Try-Confirm-Cancel,是一种通过预先定义的确认和取消操作来保证事务最终一致性的模式。

**Try 阶段:**调用 Try 接口,尝试执行业务,完成所有业务检查,预留业务资源。
Confirm 或 Cancel 阶段:两者是互斥的,只能进入其中一个,并且都满足幂等性,允许失败重试。

**Confirm 操作:**对业务系统做确认提交,确认执行业务操作,不做其他业务检查,只使用 Try 阶段预留的业务资
源。

**Cancel 操作:**在业务执行错误,需要回滚的状态下执行业务取消,释放预留资源。

转账场景示例:


//Account类代表一个账户,拥有冻结、解冻、存款和取款的方法。Money类代表金额。
public class AccountService {// Try阶段:检查账户余额并冻结资金public boolean prepareTransfer(Account source, Account target, Money amount) {if (source.getBalance() < amount) {return false; // 余额不足}source.freeze(amount); // 冻结资金return true;}// Confirm阶段:实际转账操作public void confirmTransfer(Account source, Account target, Money amount) {source.withdraw(amount); // 从源账户扣除金额target.deposit(amount); // 向目标账户增加金额}// Cancel阶段:回滚操作,解冻资金public void cancelTransfer(Account source, Account target, Money amount) {source.unfreeze(amount); // 解冻资金}
}

注意事项

**幂等性:**确保Try、Confirm和Cancel操作都是幂等的,以支持重复执行而不会引起副作用。

**空回滚:**系统应能够处理“空回滚”的情况,即Cancel操作被调用,但Try操作并未实际执行。

**防悬挂:**确保系统能够处理悬挂操作,即Try操作在网络延迟后到达,而Cancel操作已经执行。

本地消息表

本地消息表的方案最初是由 ebay 的工程师提出,核心思想是将分布式事务拆分成本地事务进行处理,通过消息日志的方式来异步执行。本地消息表是一种业务耦合的设计,消息生产方需要额外建一个事务消息表,并记录消息发送状态,消息消费方需要处理这个消息,并完成自己的业务逻辑,另外会有一个异步机制来定期扫描未完成的消息,确保最终一致性。

实战代码示例:

  1. 系统收到下单请求,将订单业务数据存入到订单库中,并且同时存储该订单对应的消息数据,比如购买商品的 ID 和数量,消息数据与订单库为同一库,更新订单和存储消息为一个本地事务,要么都成功,要么都失败。
   @Servicepublic class OrderService {@Resourceprivate OrderMapper orderMapper;@Resourceprivate OrderMessageMapper orderMessageMapper;@Autowiredprivate MessageProducer messageProducer; // 消息队列的发送器@Transactionalpublic void placeOrder(Order order, OrderMessage orderMessage) {// 将订单业务数据存入到订单库中int orderRows = orderMapper.insert(order);// 同时存储该订单对应的消息数据int messageRows = orderMessageMapper.insert(orderMessage);// 确保订单和消息数据都成功插入if (orderRows == 1 && messageRows == 1) {// 发送库存更新消息到消息队列messageProducer.sendMessage(orderMessage);} else {// 如果任何插入失败,抛出异常以回滚事务throw new RuntimeException("Order or message data insertion failed");}}}
  1. 库存服务更新
    @Autowiredprivate InventoryDomainService inventoryDomainService;@Overridepublic boolean consume(MqMessageEntity<OrderMessage> mqMessageEntity) {log.info("接收订单支付完成请求,扣件库存:{}", JSON.toJSONString(mqMessageEntity));return inventoryDomainService.deductionInventory(mqMessageEntity);}
  1. 订单服务更新本地消息表
        @Autowiredprivate OrderMessageMapper orderMessageMapper;public void sendMessage(OrderMessage orderMessage) {// 向消息队列发送库存更新消息// 消息发送成功的回调中更新本地消息表状态orderMessageMapper.upodateMessageStatus(orderMessage);}
  1. 异步任务重试机制
    使用Spring的@Scheduled注解来定时触发异步任务。这个地方用任何调度计划都可以实现 我用的是spring自带的@Scheduled注解实现的。异步技术也可以根据自己的情况选择。

@Component
public class MessageRetryScheduler {@Autowiredprivate MessageProducer messageProducer;@Scheduled(fixedRate = 60000) // 每60秒执行一次public void scheduleMessageRetry() {messageProducer.scanAndRetryUnsentMessages();}
}@Autowiredprivate OrderMessageMapper orderMessageMapper;@Asyncpublic void scanAndRetryUnsentMessages() {List<OrderMessageDO> unsentMessages = orderMessageMapper.queryByStatus("PENDING");for (OrderMessage message : unsentMessages) {try {sendMessage(message); // 重试发送消息orderMessageMapper.updateStatus(message);} catch (Exception e) {// 可以选择更新状态为错误或其他逻辑orderMessageMapper.updateStatus(message);}}}

RocketMQ 事务消息

RocketMQ 事务消息是一种支持分布式事务的消息。它通过引入 prepare、commit 和 rollback 三个阶段,来确保事务消息的一致性。

**prepare 阶段:**消息发送方发送半消息,此时消息的状态为“待提交”。

**commit 阶段:**消息发送方向 RocketMQ 发送 commit 消息请求,RocketMQ 判断此时半消息是否被确认,如果半消息已被确认,则将消息标记为“可消费”并提交事务。如果半消息未被确认,则将消息标记为“不可消费”并终止事务。

**rollback 阶段:**消息发送方向 RocketMQ 发送 rollback 消息请求,RocketMQ 将半消息标记为“不可消费”并回滚
事务。


代码示例:

创建并初始化一个事务消息生产者:

import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;public class TransactionProducer {public static void main(String[] args) throws Exception {// 实例化消息生产者,并指定NameServer地址TransactionMQProducer producer = new TransactionMQProducer("please_rename_unique_group_name");producer.setNamesrvAddr("localhost:9876");// 指定事务监听器producer.setTransactionListener(new TransactionListener() {@Overridepublic LocalTransactionState executeLocalTransaction(Message msg, Object arg) {// 执行本地事务逻辑,例如数据库操作// 假设执行成功,返回Commit状态return LocalTransactionState.CommitMessage;}@Overridepublic LocalTransactionState checkLocalTransaction(Message msg) {// 检查本地事务状态,确认是否需要提交或回滚// 这里可以根据业务逻辑来实现检查// 假设检查通过,返回Unknown状态,让消息服务决定是提交还是回滚return LocalTransactionState.Unknown;}});// 启动生产者producer.start();// 创建消息Message msg = new Message("TopicTest", "TagA", "Hello RocketMQ".getBytes());// 发送事务消息SendResult sendResult = producer.sendMessageInTransaction(msg, null);System.out.printf("%s%n", sendResult);// 关闭生产者producer.shutdown();}
}

在代码示例中我们实现了TransactionListener接口的两个方法:

**executeLocalTransaction:**执行本地事务逻辑,返回事务状态。如果本地事务执行成功,返回CommitMessage;如果执行失败,返回RollbackMessage。

**checkLocalTransaction:**检查本地事务状态。如果事务状态未知,返回Unknown,让消息队列服务决定是提交还是回滚消息。

最大努力通知

最大努力通知型( Best-effort delivery)是最简单的一种柔性事务,适用于一些最终一致性时间敏感度低
的业务,且被动方处理结果 不影响主动方的处理结果。典型的使用场景:如银行通知、商户通知等。
最大努力通知型的实现方案,一般符合以下特点:

  1. 不可靠消息:业务活动主动方,在完成业务处理之后,向业务活动的被动方发送消息,直到通知N次后不再通知,允许消息丢失(不可靠消息)。
  2. 定期校对:业务活动的被动方,根据定时策略,向业务活动主动方查询(主动方提供查询接口),恢复丢失的业务消息

代码示例:

发送通知

@Service
public class NotificationService {@Autowiredprivate RabbitTemplate rabbitTemplate;public void sendNotification(String message) {// 发送通知消息到MQrabbitTemplate.convertAndSend("notificationExchange", "notificationRoutingKey", message);}
}

监听消息并处理

@Component
public class NotificationListener {@RabbitListener(queues = "notificationQueue")public void handleNotification(String message) {// 处理消息,例如更新库存processNotification(message);// 确认消息已处理acknowledgeMessage();}private void processNotification(String message) {// 业务逻辑处理}private void acknowledgeMessage() {// 向发送方确认消息已处理的逻辑}
}

重试机制通常由消息中间件提供,如RabbitMQ的死信队列和重试策略。校对机制可能需要额外的接口和逻辑来实现。

结语

最终一致性作为分布式系统中一种重要的事务处理哲学,它在实践中展现出了强大的生命力。然而,没有银弹存在,每种模型都有其适用场景与局限。作为技术探索者,我们应当持续思考如何更精细地控制一致性级别,结合业务特性量体裁衣,设计出既能满足业务需求又能保持系统弹性的解决方案。那么,您在实际项目中遇到过哪些分布式事务的挑战?对于最终一致性模型又有何独到见解或疑问呢?欢迎留言讨论,共同推进技术的边界。

这篇关于命运交织的节点:分布式事务最终一致性的心跳共鸣纪实的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/975106

相关文章

Python FastAPI+Celery+RabbitMQ实现分布式图片水印处理系统

《PythonFastAPI+Celery+RabbitMQ实现分布式图片水印处理系统》这篇文章主要为大家详细介绍了PythonFastAPI如何结合Celery以及RabbitMQ实现简单的分布式... 实现思路FastAPI 服务器Celery 任务队列RabbitMQ 作为消息代理定时任务处理完整

SpringKafka消息发布之KafkaTemplate与事务支持功能

《SpringKafka消息发布之KafkaTemplate与事务支持功能》通过本文介绍的基本用法、序列化选项、事务支持、错误处理和性能优化技术,开发者可以构建高效可靠的Kafka消息发布系统,事务支... 目录引言一、KafkaTemplate基础二、消息序列化三、事务支持机制四、错误处理与重试五、性能优

Spring事务中@Transactional注解不生效的原因分析与解决

《Spring事务中@Transactional注解不生效的原因分析与解决》在Spring框架中,@Transactional注解是管理数据库事务的核心方式,本文将深入分析事务自调用的底层原理,解释为... 目录1. 引言2. 事务自调用问题重现2.1 示例代码2.2 问题现象3. 为什么事务自调用会失效3

redis+lua实现分布式限流的示例

《redis+lua实现分布式限流的示例》本文主要介绍了redis+lua实现分布式限流的示例,可以实现复杂的限流逻辑,如滑动窗口限流,并且避免了多步操作导致的并发问题,具有一定的参考价值,感兴趣的可... 目录为什么使用Redis+Lua实现分布式限流使用ZSET也可以实现限流,为什么选择lua的方式实现

Seata之分布式事务问题及解决方案

《Seata之分布式事务问题及解决方案》:本文主要介绍Seata之分布式事务问题及解决方案,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录Seata–分布式事务解决方案简介同类产品对比环境搭建1.微服务2.SQL3.seata-server4.微服务配置事务模式1

MYSQL事务死锁问题排查及解决方案

《MYSQL事务死锁问题排查及解决方案》:本文主要介绍Java服务报错日志的情况,并通过一系列排查和优化措施,最终发现并解决了服务假死的问题,文中通过代码介绍的非常详细,需要的朋友可以参考下... 目录问题现象推测 1 - 客户端无错误重试配置推测 2 - 客户端超时时间过短推测 3 - mysql 版本问

java如何分布式锁实现和选型

《java如何分布式锁实现和选型》文章介绍了分布式锁的重要性以及在分布式系统中常见的问题和需求,它详细阐述了如何使用分布式锁来确保数据的一致性和系统的高可用性,文章还提供了基于数据库、Redis和Zo... 目录引言:分布式锁的重要性与分布式系统中的常见问题和需求分布式锁的重要性分布式系统中常见的问题和需求

Redis事务与数据持久化方式

《Redis事务与数据持久化方式》该文档主要介绍了Redis事务和持久化机制,事务通过将多个命令打包执行,而持久化则通过快照(RDB)和追加式文件(AOF)两种方式将内存数据保存到磁盘,以防止数据丢失... 目录一、Redis 事务1.1 事务本质1.2 数据库事务与redis事务1.2.1 数据库事务1.

Golang使用etcd构建分布式锁的示例分享

《Golang使用etcd构建分布式锁的示例分享》在本教程中,我们将学习如何使用Go和etcd构建分布式锁系统,分布式锁系统对于管理对分布式系统中共享资源的并发访问至关重要,它有助于维护一致性,防止竞... 目录引言环境准备新建Go项目实现加锁和解锁功能测试分布式锁重构实现失败重试总结引言我们将使用Go作

Redis分布式锁使用及说明

《Redis分布式锁使用及说明》本文总结了Redis和Zookeeper在高可用性和高一致性场景下的应用,并详细介绍了Redis的分布式锁实现方式,包括使用Lua脚本和续期机制,最后,提到了RedLo... 目录Redis分布式锁加锁方式怎么会解错锁?举个小案例吧解锁方式续期总结Redis分布式锁如果追求