saga分布式事务

2023-12-10 10:06
文章标签 分布式 事务 saga

本文主要是介绍saga分布式事务,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

一、开篇

        在支付和交易业务中,会涉及长事务的场景。依靠单一的数据库事务无法解决整体问题,这个时候,就需要分布式事务来解决这个问题。

二、技术选型

1.saga

        Saga是一种在分布式系统中处理事务的模式,它通过将一个大的事务拆分为一系列小的、相互关联的子事务来实现。每个子事务独立执行,并且可以具有回滚和补偿机制,以保证整个事务的一致性。

子事务执行协调方式

编排模式

               

        基于事件,整个业务流程散落到各个业务系统中,比较复杂,流程难以全局理解,而且需要下游实现事件接收和发放。

控制模式

         Saga提供一个控制类,其方便参与者之前的协调工作。控制类协调整个流程,下游可以无感知迁移。

2.2pc

                                                成功情况

                                        失败情况

 

        分布式事务2PC(Two-Phase Commit)是一种用于在分布式系统中保持事务一致性的协议。它是一种基于协调者(Coordinator)和参与者(Participant)之间的交互来实现的。

        2PC协议的主要目标是在分布式环境下确保所有参与者要么都提交事务,要么都回滚事务,以保持全局事务的一致性。

三、技术实现

基于axon框架实现(控制模式)

OrderFacadeService 

订单服务facade层

/**
* 订单服务app层,1.发起创建订单 2.创建订单axon控制器。
*/
@Aggregate(cache = "orderCache")
public class OrderFacadeServiceImpl implements OrdreFacadeService{@Autowiredprivate CommandGateway commandGateWay;@Autowiredprivate PaymentFacadeService paymentClient;@Autowiredprivate LogisticsFacadeService logisticsClient;/*** 创建订单,OrderDomainService会处理对应命令*/@Overridepublic CreateOrderRespDto createOrder(CreateOrderReqDto req){var command = new CreateOrderCommand(req);return commandGateway.sendAndWait(command);}/*** 订单创建成功后创建支付*/@EventSourcingHandlerpublic void on(OrderCreatedEvent event) {var resp = paymentClient.createPay(CreatePaymentReqDto.convertTo(event));if(resp.success){// 发送支付成功事件var event= new PayOrderCreatedEvent(resp.getPaymentOrder());AggregateLifecycle.apply(event);}else{//TODO 发起创建失败事件}}/*** 支付成功后创建物流*/@EventSourcingHandlerpublic void on(PayOrderCreatedEvent event {var resp = logisticsClient.createPay(CreateLogisticsReqDto.convertTo(event));if(resp.success){// 发送创建订单结束var event= new OrderCreateEndEvent(resp.getPaymentOrder());AggregateLifecycle.apply(event);}else{//TODO 发起创建失败事件}}/***  订单创建结束后,OrderDomainService会处理对应命令*/@EventSourcingHandlerpublic void on(OrderCreateEndEvent event) {var command = new OrderCreateEndCommand(event);return commandGateway.sendAndWait(command);}}

OrderDomainService

订单领域层服务

@Service
@Aggregate(cache = "orderCache")
public class OrderDomainServiceImpl implements OrderDomainService{private OrderRepository orderRepository;/*** 创建订单*/@CommandHandler@Overridepublic OrderCreateRespDto handle(OrderCreateCommand command){Order order = Order.convertTo(command);// 存储订单orderRepository.save(order);// 发布订单已创建事件AggregateLifecycle.apply(new OrderCreatedEvent(order));return OrderCreateRespDto.convertTo(order, command);}/*** 创建订单结束*/@CommandHandler@Overridepublic CreateOrderEndRespDto handle(CreateOrderEndCommand command){Order order = Order.convertTo(command);// 更新订单orderRepository.updateCreateEnd(order);return CreateOrderEndRespDto.convertTo(order, command);}
}

todo: 在创建订单完成时候还可以再发起超时事件(延迟消息),延迟校验创建结果。

基于axon框架实现(编排模式)

OrderFacadeService 

订单服务facade层

@Aggregate(cache = "orderCache")
public class OrderFacadeServiceImpl implements OrdreFacadeService{@Autowiredprivate CommandGateway commandGateWay;/*** 创建订单,OrderDomainService会处理对应命令*/@Overridepublic CreateOrderRespDto createOrder(CreateOrderReqDto req){var command = new CreateOrderCommand(req);return commandGateway.sendAndWait(command);}/***  订单创建结束后,OrderDomainService会处理对应命令*/@EventSourcingHandlerpublic void on(OrderCreateEndEvent event) {var command = new OrderCreateEndCommand(event);return commandGateway.sendAndWait(command);}}

OrderDomainService

订单领域层服务

@Service
@Aggregate(cache = "orderCache")
public class OrderDomainServiceImpl implements OrderDomainService{private OrderRepository orderRepository;/*** 开始创建订单*/ @CommandHandler@Overridepublic OrderCreateRespDto handle(OrderCreateCommand command){Order order = Order.convertTo(command);// 存储订单orderRepository.save(order);// 发布订单已创建事件,由支付服务订阅AggregateLifecycle.apply(new OrderCreatedEvent(order));return OrderCreateRespDto.convertTo(order, command);}/** 订单创建结束*/    @CommandHandler@Overridepublic CreateOrderEndRespDto handle(CreateOrderEndCommand command){Order order = Order.convertTo(command);// 更新订单orderRepository.updateCreateEnd(order);return CreateOrderEndRespDto.convertTo(order, command);}
}

PaymentFacadeService

支付服务facade层

@Aggregate(cache = "orderCache")
public class PayFacadeServiceImpl implements PayFacadeService{@Autowiredprivate CommandGateway commandGateWay;/*** 订阅订单创建成功事件*/@EventSourcingHandlerpublic void on(OrderCreatedEvent event) {var command = new CreatePaymentCommand(event);commandGateway.sendAndWait(command);}}

PaymentDomainService

支付服务领域层

@Service
@Aggregate(cache = "orderCache")
public class PaymentDomainServiceImpl implements PaymentDomainService{private PaymentRepository paymentRepository;@CommandHandler@Overridepublic CreatePaymentEndRespDto handle(PaymentCreatCommand command){PaymentOrder order = PaymentOrder.convertTo(command);// 存储订单订单paymentRepository.save(order);// 发布已支付事件AggregateLifecycle.apply(new PaymentCreatedEvent(order));return CreatePaymentEndRespDto .convertTo(order, command);}
}

LogisticsFacadeService 和 LogisticsDomainService 

可以参考上面实现

两种模式比较

编排模式比较简单,通过一个控制器可以看整个流程,而且不需要下游接入axon框架。

这篇关于saga分布式事务的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/476792

相关文章

集中式版本控制与分布式版本控制——Git 学习笔记01

什么是版本控制 如果你用 Microsoft Word 写过东西,那你八成会有这样的经历: 想删除一段文字,又怕将来这段文字有用,怎么办呢?有一个办法,先把当前文件“另存为”一个文件,然后继续改,改到某个程度,再“另存为”一个文件。就这样改着、存着……最后你的 Word 文档变成了这样: 过了几天,你想找回被删除的文字,但是已经记不清保存在哪个文件了,只能挨个去找。真麻烦,眼睛都花了。看

MySql 事务练习

事务(transaction) -- 事务 transaction-- 事务是一组操作的集合,是一个不可分割的工作单位,事务会将所有的操作作为一个整体一起向系统提交或撤销请求-- 事务的操作要么同时成功,要么同时失败-- MySql的事务默认是自动提交的,当执行一个DML语句,MySql会立即自动隐式提交事务-- 常见案例:银行转账-- 逻辑:A给B转账1000:1.查询

开源分布式数据库中间件

转自:https://www.csdn.net/article/2015-07-16/2825228 MyCat:开源分布式数据库中间件 为什么需要MyCat? 虽然云计算时代,传统数据库存在着先天性的弊端,但是NoSQL数据库又无法将其替代。如果传统数据易于扩展,可切分,就可以避免单机(单库)的性能缺陷。 MyCat的目标就是:低成本地将现有的单机数据库和应用平滑迁移到“云”端

Lua 脚本在 Redis 中执行时的原子性以及与redis的事务的区别

在 Redis 中,Lua 脚本具有原子性是因为 Redis 保证在执行脚本时,脚本中的所有操作都会被当作一个不可分割的整体。具体来说,Redis 使用单线程的执行模型来处理命令,因此当 Lua 脚本在 Redis 中执行时,不会有其他命令打断脚本的执行过程。脚本中的所有操作都将连续执行,直到脚本执行完成后,Redis 才会继续处理其他客户端的请求。 Lua 脚本在 Redis 中原子性的原因

laravel框架实现redis分布式集群原理

在app/config/database.php中配置如下: 'redis' => array('cluster' => true,'default' => array('host' => '172.21.107.247','port' => 6379,),'redis1' => array('host' => '172.21.107.248','port' => 6379,),) 其中cl

基于MySQL实现的分布式锁

概述 在单机时代,虽然不需要分布式锁,但也面临过类似的问题,只不过在单机的情况下,如果有多个线程要同时访问某个共享资源的时候,我们可以采用线程间加锁的机制,即当某个线程获取到这个资源后,就立即对这个资源进行加锁,当使用完资源之后,再解锁,其它线程就可以接着使用了。例如,在JAVA中,甚至专门提供了一些处理锁机制的一些API(synchronize/Lock等)。 但是到了分布式系统的时代,这种

Kafka 分布式消息系统详细介绍

Kafka 分布式消息系统 一、Kafka 概述1.1 Kafka 定义1.2 Kafka 设计目标1.3 Kafka 特点 二、Kafka 架构设计2.1 基本架构2.2 Topic 和 Partition2.3 消费者和消费者组2.4 Replica 副本 三、Kafka 分布式集群搭建3.1 下载解压3.1.1 上传解压 3.2 修改 Kafka 配置文件3.2.1 修改zookeep

spring事务属性的xml格式配置

实际是使用代理做的事务优化 <!--配置事务的属性--><tx:advice id="txAdvice" transaction-manager="transactionManager"> <tx:attributes> <!--匹配所有以add开头的方法--><tx:method name="add*" propagation="REQUIRED" /> <tx:metho

Spring Cloud整合Seata实现分布式事务

文章目录 1.Seata1.1 官网1.2 下载1.3 通过安装包运行seata1.3.1 解压seata-server-1.3.0.zip1.3.2 修改 conf/file.conf 配置文件1.3.3 修改conf/registry.conf配置文件1.3.4 添加seata配置信息到nacos1.3.5 配置seata服务端数据库表结构1.3.6 启动seata 2.Spring

ELK+Spring Cloud搭建分布式日志中心

ELK+Spring Cloud搭建分布式日志中心 1.ELK简介2.资源包下载3.Elasticsearch安装3.1 解压Elasticsearch3.2 修改Elasticsearch的配置文件3.3 修改系统配置3.4 启动Elasticsearch 4.ElasticSearch-head插件安装5.Logstash安装6.Kibana安装7.SpringCloud集成logsta