saga分布式事务

2023-12-10 10:06
文章标签 分布式 事务 saga

本文主要是介绍saga分布式事务,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

一、开篇

        在支付和交易业务中,会涉及长事务的场景。依靠单一的数据库事务无法解决整体问题,这个时候,就需要分布式事务来解决这个问题。

二、技术选型

1.saga

        Saga是一种在分布式系统中处理事务的模式,它通过将一个大的事务拆分为一系列小的、相互关联的子事务来实现。每个子事务独立执行,并且可以具有回滚和补偿机制,以保证整个事务的一致性。

子事务执行协调方式

编排模式

               

        基于事件,整个业务流程散落到各个业务系统中,比较复杂,流程难以全局理解,而且需要下游实现事件接收和发放。

控制模式

         Saga提供一个控制类,其方便参与者之前的协调工作。控制类协调整个流程,下游可以无感知迁移。

2.2pc

                                                成功情况

                                        失败情况

 

        分布式事务2PC(Two-Phase Commit)是一种用于在分布式系统中保持事务一致性的协议。它是一种基于协调者(Coordinator)和参与者(Participant)之间的交互来实现的。

        2PC协议的主要目标是在分布式环境下确保所有参与者要么都提交事务,要么都回滚事务,以保持全局事务的一致性。

三、技术实现

基于axon框架实现(控制模式)

OrderFacadeService 

订单服务facade层

/**
* 订单服务app层,1.发起创建订单 2.创建订单axon控制器。
*/
@Aggregate(cache = "orderCache")
public class OrderFacadeServiceImpl implements OrdreFacadeService{@Autowiredprivate CommandGateway commandGateWay;@Autowiredprivate PaymentFacadeService paymentClient;@Autowiredprivate LogisticsFacadeService logisticsClient;/*** 创建订单,OrderDomainService会处理对应命令*/@Overridepublic CreateOrderRespDto createOrder(CreateOrderReqDto req){var command = new CreateOrderCommand(req);return commandGateway.sendAndWait(command);}/*** 订单创建成功后创建支付*/@EventSourcingHandlerpublic void on(OrderCreatedEvent event) {var resp = paymentClient.createPay(CreatePaymentReqDto.convertTo(event));if(resp.success){// 发送支付成功事件var event= new PayOrderCreatedEvent(resp.getPaymentOrder());AggregateLifecycle.apply(event);}else{//TODO 发起创建失败事件}}/*** 支付成功后创建物流*/@EventSourcingHandlerpublic void on(PayOrderCreatedEvent event {var resp = logisticsClient.createPay(CreateLogisticsReqDto.convertTo(event));if(resp.success){// 发送创建订单结束var event= new OrderCreateEndEvent(resp.getPaymentOrder());AggregateLifecycle.apply(event);}else{//TODO 发起创建失败事件}}/***  订单创建结束后,OrderDomainService会处理对应命令*/@EventSourcingHandlerpublic void on(OrderCreateEndEvent event) {var command = new OrderCreateEndCommand(event);return commandGateway.sendAndWait(command);}}

OrderDomainService

订单领域层服务

@Service
@Aggregate(cache = "orderCache")
public class OrderDomainServiceImpl implements OrderDomainService{private OrderRepository orderRepository;/*** 创建订单*/@CommandHandler@Overridepublic OrderCreateRespDto handle(OrderCreateCommand command){Order order = Order.convertTo(command);// 存储订单orderRepository.save(order);// 发布订单已创建事件AggregateLifecycle.apply(new OrderCreatedEvent(order));return OrderCreateRespDto.convertTo(order, command);}/*** 创建订单结束*/@CommandHandler@Overridepublic CreateOrderEndRespDto handle(CreateOrderEndCommand command){Order order = Order.convertTo(command);// 更新订单orderRepository.updateCreateEnd(order);return CreateOrderEndRespDto.convertTo(order, command);}
}

todo: 在创建订单完成时候还可以再发起超时事件(延迟消息),延迟校验创建结果。

基于axon框架实现(编排模式)

OrderFacadeService 

订单服务facade层

@Aggregate(cache = "orderCache")
public class OrderFacadeServiceImpl implements OrdreFacadeService{@Autowiredprivate CommandGateway commandGateWay;/*** 创建订单,OrderDomainService会处理对应命令*/@Overridepublic CreateOrderRespDto createOrder(CreateOrderReqDto req){var command = new CreateOrderCommand(req);return commandGateway.sendAndWait(command);}/***  订单创建结束后,OrderDomainService会处理对应命令*/@EventSourcingHandlerpublic void on(OrderCreateEndEvent event) {var command = new OrderCreateEndCommand(event);return commandGateway.sendAndWait(command);}}

OrderDomainService

订单领域层服务

@Service
@Aggregate(cache = "orderCache")
public class OrderDomainServiceImpl implements OrderDomainService{private OrderRepository orderRepository;/*** 开始创建订单*/ @CommandHandler@Overridepublic OrderCreateRespDto handle(OrderCreateCommand command){Order order = Order.convertTo(command);// 存储订单orderRepository.save(order);// 发布订单已创建事件,由支付服务订阅AggregateLifecycle.apply(new OrderCreatedEvent(order));return OrderCreateRespDto.convertTo(order, command);}/** 订单创建结束*/    @CommandHandler@Overridepublic CreateOrderEndRespDto handle(CreateOrderEndCommand command){Order order = Order.convertTo(command);// 更新订单orderRepository.updateCreateEnd(order);return CreateOrderEndRespDto.convertTo(order, command);}
}

PaymentFacadeService

支付服务facade层

@Aggregate(cache = "orderCache")
public class PayFacadeServiceImpl implements PayFacadeService{@Autowiredprivate CommandGateway commandGateWay;/*** 订阅订单创建成功事件*/@EventSourcingHandlerpublic void on(OrderCreatedEvent event) {var command = new CreatePaymentCommand(event);commandGateway.sendAndWait(command);}}

PaymentDomainService

支付服务领域层

@Service
@Aggregate(cache = "orderCache")
public class PaymentDomainServiceImpl implements PaymentDomainService{private PaymentRepository paymentRepository;@CommandHandler@Overridepublic CreatePaymentEndRespDto handle(PaymentCreatCommand command){PaymentOrder order = PaymentOrder.convertTo(command);// 存储订单订单paymentRepository.save(order);// 发布已支付事件AggregateLifecycle.apply(new PaymentCreatedEvent(order));return CreatePaymentEndRespDto .convertTo(order, command);}
}

LogisticsFacadeService 和 LogisticsDomainService 

可以参考上面实现

两种模式比较

编排模式比较简单,通过一个控制器可以看整个流程,而且不需要下游接入axon框架。

这篇关于saga分布式事务的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/476792

相关文章

深入理解Apache Kafka(分布式流处理平台)

《深入理解ApacheKafka(分布式流处理平台)》ApacheKafka作为现代分布式系统中的核心中间件,为构建高吞吐量、低延迟的数据管道提供了强大支持,本文将深入探讨Kafka的核心概念、架构... 目录引言一、Apache Kafka概述1.1 什么是Kafka?1.2 Kafka的核心概念二、Ka

通过Spring层面进行事务回滚的实现

《通过Spring层面进行事务回滚的实现》本文主要介绍了通过Spring层面进行事务回滚的实现,包括声明式事务和编程式事务,具有一定的参考价值,感兴趣的可以了解一下... 目录声明式事务回滚:1. 基础注解配置2. 指定回滚异常类型3. ​不回滚特殊场景编程式事务回滚:1. ​使用 TransactionT

Python FastAPI+Celery+RabbitMQ实现分布式图片水印处理系统

《PythonFastAPI+Celery+RabbitMQ实现分布式图片水印处理系统》这篇文章主要为大家详细介绍了PythonFastAPI如何结合Celery以及RabbitMQ实现简单的分布式... 实现思路FastAPI 服务器Celery 任务队列RabbitMQ 作为消息代理定时任务处理完整

SpringKafka消息发布之KafkaTemplate与事务支持功能

《SpringKafka消息发布之KafkaTemplate与事务支持功能》通过本文介绍的基本用法、序列化选项、事务支持、错误处理和性能优化技术,开发者可以构建高效可靠的Kafka消息发布系统,事务支... 目录引言一、KafkaTemplate基础二、消息序列化三、事务支持机制四、错误处理与重试五、性能优

Spring事务中@Transactional注解不生效的原因分析与解决

《Spring事务中@Transactional注解不生效的原因分析与解决》在Spring框架中,@Transactional注解是管理数据库事务的核心方式,本文将深入分析事务自调用的底层原理,解释为... 目录1. 引言2. 事务自调用问题重现2.1 示例代码2.2 问题现象3. 为什么事务自调用会失效3

redis+lua实现分布式限流的示例

《redis+lua实现分布式限流的示例》本文主要介绍了redis+lua实现分布式限流的示例,可以实现复杂的限流逻辑,如滑动窗口限流,并且避免了多步操作导致的并发问题,具有一定的参考价值,感兴趣的可... 目录为什么使用Redis+Lua实现分布式限流使用ZSET也可以实现限流,为什么选择lua的方式实现

Seata之分布式事务问题及解决方案

《Seata之分布式事务问题及解决方案》:本文主要介绍Seata之分布式事务问题及解决方案,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录Seata–分布式事务解决方案简介同类产品对比环境搭建1.微服务2.SQL3.seata-server4.微服务配置事务模式1

MYSQL事务死锁问题排查及解决方案

《MYSQL事务死锁问题排查及解决方案》:本文主要介绍Java服务报错日志的情况,并通过一系列排查和优化措施,最终发现并解决了服务假死的问题,文中通过代码介绍的非常详细,需要的朋友可以参考下... 目录问题现象推测 1 - 客户端无错误重试配置推测 2 - 客户端超时时间过短推测 3 - mysql 版本问

java如何分布式锁实现和选型

《java如何分布式锁实现和选型》文章介绍了分布式锁的重要性以及在分布式系统中常见的问题和需求,它详细阐述了如何使用分布式锁来确保数据的一致性和系统的高可用性,文章还提供了基于数据库、Redis和Zo... 目录引言:分布式锁的重要性与分布式系统中的常见问题和需求分布式锁的重要性分布式系统中常见的问题和需求

Redis事务与数据持久化方式

《Redis事务与数据持久化方式》该文档主要介绍了Redis事务和持久化机制,事务通过将多个命令打包执行,而持久化则通过快照(RDB)和追加式文件(AOF)两种方式将内存数据保存到磁盘,以防止数据丢失... 目录一、Redis 事务1.1 事务本质1.2 数据库事务与redis事务1.2.1 数据库事务1.