本文主要是介绍Seata-彻底解决Spring Cloud中的分布式事务问题,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
文章目录
- 一、分布式事务问题
- 二、Seata简介
- 三、Seata典型的分布式事务处理过程
- 四、seata-server的安装与配置(服务端)
- 五、数据库准备
- 导入seat-server数据库
- 创建业务数据库
- 六、模拟分布式事务问题
- 七、seata服务配置(客户端)
一、分布式事务问题
- 分布式事务
分布式事务可以理解成一个包含了若干分支事务的全局事务,全局事务的职责是协调其下管辖的分支事务达成一致,要么一起成功提交,要么一起失败回滚。此外,通常分支事务本身就是一个满足ACID的本地事务。 - 单体应用
单体应用中,一个业务操作需要调用三个模块完成,此时数据的一致性由本地事务来保证。
- 微服务应用
随着业务需求的变化,单体应用被拆分成微服务应用,原来的三个模块被拆分成三个独立的应用,分别使用独立的数据源,业务操作需要调用三个服务来完成。此时每个服务内部的数据一致性由本地事务来保证,但是全局的数据一致性问题没法保证。
- 小结
在微服务架构中由于全局数据一致性没法保证产生的问题就是分布式事务问题。简单来说,一次业务操作需要操作多个数据源或需要进行远程调用,就会产生分布式事务问题。
二、Seata简介
Seata是Alibaba开源的一款分布式事务解决方案,致力于提供高性能和简单易用的分布式事务服务。Seata 将为用户提供了 AT、TCC、SAGA 和 XA 事务模式(默认是AT模式),为用户打造一站式的分布式解决方案。
Seata框架的应用模式为客户端-服务器模式,其对分布式事务的处理过程涉及三个组件:Transaction Coordinator(TC)、Transaction Manager(TM)和Resource Manager(RM)。
全称 | 简称 | 中文 | 作用 |
---|---|---|---|
Transaction Coordinator | TC | 事务协调器 | 维护全局事务的运行状态,负责协调并驱动全局事务的提交或回滚 |
Transaction Manager | TM | 事务管理器 | 控制全局事务的边界,负责开启一个全局事务,并最终发起全局提交或回滚的决议 |
Resource Manage | RM | 资源管理器 | 控制分支事务,负责分支注册、状态汇报,并接收事务协调器的指令,驱动分支(本地)事务的提交和回滚 |
三、Seata典型的分布式事务处理过程
- TM 向 TC 申请开启一个全局事务,全局事务创建成功并生成一个全局唯一的 XID
- XID 在微服务调用链路的上下文中传播
- RM 向 TC 注册分支事务,将其纳入 XID 对应全局事务的管辖
- TM 向 TC 发起针对 XID 的全局提交或回滚决议
- TC 调度 XID 下管辖的全部分支事务完成提交或回滚请求
TM 下达全局事务的开启、提交和回滚指令给 TC,TC 接收到 TM 下达的指令后向 RM 发布调度指令,RM 接收到 TC 发布的指令后执行提交和回滚操作。
四、seata-server的安装与配置(服务端)
- 从官网下载seata-server,下载地址:https://github.com/seata/seata/releases,解压后如下图:
- 修改conf目录下的file.conf配置文件,主要修改自定义事务组名称,事务日志存储模式为db及数据库连接信息
service {#vgroup->rgroupvgroupMapping.fsp_tx_group = "default" #修改事务组名称为:fsp_tx_group,和客户端自定义的名称对应#only support single nodedefault.grouplist = "127.0.0.1:8091"#degrade current not supportenableDegrade = false#disabledisable = false#unit ms,s,m,h,d represents milliseconds, seconds, minutes, hours, days, default permanentmax.commit.retry.timeout = "-1"max.rollback.retry.timeout = "-1"
}
## transaction log store, only used in seata-server
store {## store mode: file、db、redismode = "db" #修改此处将事务信息存储到数据库中## file store propertyfile {## store location dirdir = "sessionStore"# branch session size , if exceeded first try compress lockkey, still exceeded throws exceptionsmaxBranchSessionSize = 16384# globe session size , if exceeded throws exceptionsmaxGlobalSessionSize = 512# file buffer size , if exceeded allocate new bufferfileWriteBufferCacheSize = 16384# when recover batch read sizesessionReloadReadSize = 100# async, syncflushDiskMode = async}## database store propertydb {## the implement of javax.sql.DataSource, such as DruidDataSource(druid)/BasicDataSource(dbcp)/HikariDataSource(hikari) etc.datasource = "druid"## mysql/oracle/postgresql/h2/oceanbase etc.dbType = "mysql"driverClassName = "com.mysql.cj.jdbc.Driver"url = "jdbc:mysql://localhost:3306/seat-server?serverTimezone=GMT%2B8"user = "root"password = "root"minConn = 5maxConn = 100globalTable = "global_table"branchTable = "branch_table"lockTable = "lock_table"queryLimit = 100maxWait = 5000}## redis store propertyredis {host = "127.0.0.1"port = "6379"password = ""database = "0"minConn = 1maxConn = 10maxTotal = 100queryLimit = 100}}
注意:因为我本地安装的mysql版本为8.0.21,所以driverClassName为“com.mysql.cj.jdbc.Driver”,且url后要添加“serverTimezone=GMT%2B8”,而mysql8之前的版本driverClassName为“com.mysql.jdbc.Driver”。
3. 修改conf目录下的registry.conf配置文件,指明注册中心为nacos,及修改nacos连接信息即可。
registry {# file 、nacos 、eureka、redis、zk、consul、etcd3、sofatype = "nacos" #改为nacosloadBalance = "RandomLoadBalance"loadBalanceVirtualNodes = 10nacos {application = "seata-server"serverAddr = "127.0.0.1:8848"group = "SEATA_GROUP"namespace = ""cluster = "default"username = ""password = ""}eureka {serviceUrl = "http://localhost:8761/eureka"application = "default"weight = "1"}redis {serverAddr = "localhost:6379"db = 0password = ""cluster = "default"timeout = 0}zk {cluster = "default"serverAddr = "127.0.0.1:2181"sessionTimeout = 6000connectTimeout = 2000username = ""password = ""}consul {cluster = "default"serverAddr = "127.0.0.1:8500"}etcd3 {cluster = "default"serverAddr = "http://localhost:2379"}sofa {serverAddr = "127.0.0.1:9603"application = "default"region = "DEFAULT_ZONE"datacenter = "DefaultDataCenter"cluster = "default"group = "SEATA_GROUP"addressWaitTime = "3000"}file {name = "file.conf"}
}config {# file、nacos 、apollo、zk、consul、etcd3type = "file"nacos {serverAddr = "127.0.0.1:8848" #改为nacos的连接地址namespace = ""group = "SEATA_GROUP"username = ""password = ""}consul {serverAddr = "127.0.0.1:8500"}apollo {appId = "seata-server"apolloMeta = "http://192.168.1.204:8801"namespace = "application"apolloAccesskeySecret = ""}zk {serverAddr = "127.0.0.1:2181"sessionTimeout = 6000connectTimeout = 2000username = ""password = ""}etcd3 {serverAddr = "http://localhost:2379"}file {name = "file.conf"}
}
- 先启动Nacos,再使用seata-server中/bin/seata-server.bat文件启动seata-server。
五、数据库准备
导入seat-server数据库
由于我们使用了db模式存储事务日志,所以我们需要创建一个seat-server数据库,建表db_store.sql、db_undo_log.sql
(注意,自1.0版本后,seata-server解压后的conf文件夹下已经没有db_store.sql和db_undo_log.sql)。
db_store.sql
drop table if exists `global_table`;
create table `global_table` (`xid` varchar(128) not null,`transaction_id` bigint,`status` tinyint not null,`application_id` varchar(32),`transaction_service_group` varchar(32),`transaction_name` varchar(128),`timeout` int,`begin_time` bigint,`application_data` varchar(2000),`gmt_create` datetime,`gmt_modified` datetime,primary key (`xid`),key `idx_gmt_modified_status` (`gmt_modified`, `status`),key `idx_transaction_id` (`transaction_id`)
);drop table if exists `branch_table`;
create table `branch_table` (`branch_id` bigint not null,`xid` varchar(128) not null,`transaction_id` bigint ,`resource_group_id` varchar(32),`resource_id` varchar(256) ,`lock_key` varchar(128) ,`branch_type` varchar(8) ,`status` tinyint,`client_id` varchar(64),`application_data` varchar(2000),`gmt_create` datetime,`gmt_modified` datetime,primary key (`branch_id`),key `idx_xid` (`xid`)
);drop table if exists `lock_table`;
create table `lock_table` (`row_key` varchar(128) not null,`xid` varchar(96),`transaction_id` long ,`branch_id` long,`resource_id` varchar(256) ,`table_name` varchar(32) ,`pk` varchar(36) ,`gmt_create` datetime ,`gmt_modified` datetime,primary key(`row_key`)
);
db_undo_log.sql
CREATE TABLE `undo_log` (`id` bigint(20) NOT NULL AUTO_INCREMENT,`branch_id` bigint(20) NOT NULL,`xid` varchar(100) NOT NULL,`context` varchar(128) NOT NULL,`rollback_info` longblob NOT NULL,`log_status` int(11) NOT NULL,`log_created` datetime NOT NULL,`log_modified` datetime NOT NULL,`ext` varchar(100) DEFAULT NULL,PRIMARY KEY (`id`),UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
创建业务数据库
- seat-order:存储订单的数据库
- seat-storage:存储库存的数据库
- seat-account:存储账户信息的数据库
order表
CREATE TABLE `order` (`id` bigint(11) NOT NULL AUTO_INCREMENT,`user_id` bigint(11) DEFAULT NULL COMMENT '用户id',`product_id` bigint(11) DEFAULT NULL COMMENT '产品id',`count` int(11) DEFAULT NULL COMMENT '数量',`money` decimal(11,0) DEFAULT NULL COMMENT '金额',PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=7 DEFAULT CHARSET=utf8;ALTER TABLE `order` ADD COLUMN `status` int(1) DEFAULT NULL COMMENT '订单状态:0:创建中;1:已完结' AFTER `money` ;
storage表
CREATE TABLE `storage` (`id` bigint(11) NOT NULL AUTO_INCREMENT,`product_id` bigint(11) DEFAULT NULL COMMENT '产品id',`total` int(11) DEFAULT NULL COMMENT '总库存',`used` int(11) DEFAULT NULL COMMENT '已用库存',`residue` int(11) DEFAULT NULL COMMENT '剩余库存',PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8;INSERT INTO `seat-storage`.`storage` (`id`, `product_id`, `total`, `used`, `residue`) VALUES ('1', '1', '100', '0', '100');
account表
CREATE TABLE `account` (`id` bigint(11) NOT NULL AUTO_INCREMENT COMMENT 'id',`user_id` bigint(11) DEFAULT NULL COMMENT '用户id',`total` decimal(10,0) DEFAULT NULL COMMENT '总额度',`used` decimal(10,0) DEFAULT NULL COMMENT '已用余额',`residue` decimal(10,0) DEFAULT '0' COMMENT '剩余可用额度',PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8;INSERT INTO `seat-account`.`account` (`id`, `user_id`, `total`, `used`, `residue`) VALUES ('1', '1', '1000', '0', '1000');
六、模拟分布式事务问题
创建三个服务,分别为订单服务(seata-order-service)、库存服务(seata-storage-service)和账户服务(seata-account-service)。
用户通过订单服务下单,订单服务会创建一个订单,然后远程调用库存服务扣减下单商品的库存,再远程调用账户服务扣减账户余额,最后订单状态更新为完成状态。
创建的三个服务为三个独立的应用,每个服务都使用自己的独立的数据源,每个服务内部的数据一致性由本地事务来保证。但上述操作,跨越三个数据库,有两次远程调用,故而会存在分布式事务问题。
七、seata服务配置(客户端)
三个服务配置大致相同,故以订单服务(seata-order-service)为例来说明。
- pom文件配置
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.3.2.RELEASE</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>com.macro.cloud</groupId><artifactId>seata-order-service</artifactId><version>0.0.1-SNAPSHOT</version><name>seata-order-service</name><description>Demo project for Spring Boot</description><properties><java.version>1.8</java.version><spring-cloud.version>Hoxton.SR8</spring-cloud.version><mysql-connector-java.version>8.0.21</mysql-connector-java.version><mybatis-spring-boot-starter.version>2.0.0</mybatis-spring-boot-starter.version><druid-spring-boot-starter.version>1.1.10</druid-spring-boot-starter.version><lombok.version>1.18.8</lombok.version><seata.version>1.3.0</seata.version></properties><dependencies><!--nacos--><dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId></dependency><!--seata--><dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-alibaba-seata</artifactId><exclusions><exclusion><artifactId>seata-all</artifactId><groupId>io.seata</groupId></exclusion></exclusions></dependency><dependency><groupId>io.seata</groupId><artifactId>seata-all</artifactId><version>${seata.version}</version></dependency><!--feign--><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-openfeign</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.mybatis.spring.boot</groupId><artifactId>mybatis-spring-boot-starter</artifactId><version>${mybatis-spring-boot-starter.version}</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>${mysql-connector-java.version}</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>druid-spring-boot-starter</artifactId><version>${druid-spring-boot-starter.version}</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>${lombok.version}</version></dependency></dependencies><dependencyManagement><dependencies><dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-dependencies</artifactId><version>${spring-cloud.version}</version><type>pom</type><scope>import</scope></dependency><dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-alibaba-dependencies</artifactId><version>2.2.5.RELEASE</version><type>pom</type><scope>import</scope></dependency></dependencies></dependencyManagement><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId></plugin></plugins></build></project>
如上述pom文件,我选用的Spring Cloud Version为Spring Cloud Hoxton.SR8,Spring Cloud Alibaba Version为2.2.5.RELEASE,Spring Boot Version为2.3.2.RELEASE。版本依赖冲突,可能会导致报错java.lang.IllegalArgumentException: no server available
。具体版本说明可以查看官网地址https://github.com/alibaba/spring-cloud-alibaba/wiki/%E7%89%88%E6%9C%AC%E8%AF%B4%E6%98%8E。
- application.yml文件
server:port: 8180
spring:application:name: seata-order-servicecloud:alibaba:seata:tx-service-group: fsp_tx_group #此处内容应与seata-server保持一致nacos:discovery:server-addr: localhost:8848datasource:driver-class-name: com.mysql.cj.jdbc.Driverpassword: rooturl: jdbc:mysql://localhost:3306/seat-order?serverTimezone=GMT%2B8username: root
feign:hystrix:enabled: false
logging:level:io:seata: info
mybatis:mapperLocations: classpath:mapper/*.xml
-
file.conf 和 registry.conf
将file.conf 和 registry.conf 复制到订单服务(seata-order-service)中。
注意:file.conf文件在seata1.0版本之前,service中第一行配置为vgroup_mapping
,而在1.0版本之后被修改为vgroupMapping
。因为我本地使用的seata-server版本为1.3.0,故而在拷贝之前需将vgroup_mapping
修改为vgroupMapping
,不然启动服务时将会报错java.lang.IllegalArgumentException: endpoint format should like ip:port
。 -
在启动类中取消数据源的自动创建,即添加注解
@SpringBootApplication(exclude = DataSourceAutoConfiguration.class)
package com.macro.cloud;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.cloud.openfeign.EnableFeignClients;// 取消数据源的自动创建, 使用Seata对数据源进行代理
@SpringBootApplication(exclude = DataSourceAutoConfiguration.class)
@EnableDiscoveryClient
@EnableFeignClients
public class SeataOrderServiceApplication {public static void main(String[] args) {SpringApplication.run(SeataOrderServiceApplication.class, args);}}
- 使用Seata对数据源进行代理
package com.macro.cloud.config;import com.alibaba.druid.pool.DruidDataSource;
import io.seata.rm.datasource.DataSourceProxy;
import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.SqlSessionFactoryBean;
import org.mybatis.spring.transaction.SpringManagedTransactionFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.support.PathMatchingResourcePatternResolver;import javax.sql.DataSource;/*** @Author 听秋* @Description //TODO 使用Seata对数据源进行代理* @Date 10:28 2021/3/19* @Param * @return**/
@Configuration
public class DataSourceProxyConfig {@Value("${mybatis.mapperLocations}")private String mapperLocations;@Bean// 读取配置文件中的配置。@ConfigurationProperties(prefix = "spring.datasource")public DataSource druidDataSource(){return new DruidDataSource();}@Beanpublic DataSourceProxy dataSourceProxy(DataSource dataSource) {return new DataSourceProxy(dataSource);}@Beanpublic SqlSessionFactory sqlSessionFactoryBean(DataSourceProxy dataSourceProxy) throws Exception {SqlSessionFactoryBean sqlSessionFactoryBean = new SqlSessionFactoryBean();sqlSessionFactoryBean.setDataSource(dataSourceProxy);sqlSessionFactoryBean.setMapperLocations(new PathMatchingResourcePatternResolver().getResources(mapperLocations));sqlSessionFactoryBean.setTransactionFactory(new SpringManagedTransactionFactory());return sqlSessionFactoryBean.getObject();}}
- 使用@GlobalTransactional注解开启分布式事务
package com.macro.cloud.service.impl;import com.macro.cloud.dao.OrderDao;
import com.macro.cloud.domain.Order;
import com.macro.cloud.service.AccountService;
import com.macro.cloud.service.OrderService;
import com.macro.cloud.service.StorageService;
import io.seata.spring.annotation.GlobalTransactional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;/**1. @Author 听秋2. @Description //TODO 订单业务实现类3. @Date 10:32 2021/3/224. @Param5. @return**/
@Service
public class OrderServiceImpl implements OrderService {private static final Logger LOGGER = LoggerFactory.getLogger(OrderServiceImpl.class);@Autowiredprivate OrderDao orderDao;@Autowiredprivate StorageService storageService;@Autowiredprivate AccountService accountService;/*** 创建订单->调用库存服务扣减库存->调用账户服务扣减账户余额->修改订单状态*/@Override// 开启分布式事务,name为全局事务实例名称,rollbackFor表示遇到Exception时回滚@GlobalTransactional(name = "fsp-create-order",rollbackFor = Exception.class)public void create(Order order) {LOGGER.info("------->下单开始");//本应用创建订单orderDao.create(order);//远程调用库存服务扣减库存LOGGER.info("------->order-service中扣减库存开始");storageService.decrease(order.getProductId(),order.getCount());LOGGER.info("------->order-service中扣减库存结束");//远程调用账户服务扣减余额LOGGER.info("------->order-service中扣减余额开始");accountService.decrease(order.getUserId(),order.getMoney());LOGGER.info("------->order-service中扣减余额结束");//修改订单状态为已完成LOGGER.info("------->order-service中修改订单状态开始");orderDao.update(order.getUserId(),0);LOGGER.info("------->order-service中修改订单状态结束");LOGGER.info("------->下单结束");}
}
- 分布式事务功能演示
通过模拟异常或者注释掉@GlobalTransactional,调用下单接口,观察并对比三个数据库中的数据。
这篇关于Seata-彻底解决Spring Cloud中的分布式事务问题的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!