本文主要是介绍Spring Cloud整合Seata实现分布式事务,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
文章目录
- 1.Seata
- 1.1 官网
- 1.2 下载
- 1.3 通过安装包运行seata
- 1.3.1 解压seata-server-1.3.0.zip
- 1.3.2 修改 conf/file.conf 配置文件
- 1.3.3 修改conf/registry.conf配置文件
- 1.3.4 添加seata配置信息到nacos
- 1.3.5 配置seata服务端数据库表结构
- 1.3.6 启动seata
- 2.Spring Cloud整合Seata
- 2.1 pom.xml中添加seata依赖
- 2.2 application.yml中添加seata配置信息
- 2.3 配置代理数据源
- 2.4 在业务库中创建undo_log表结构
- 3.测试
- 4.问题处理
- 4.1 springcloud之seata在微服务模块全局异常捕捉后导致事务不会滚方案解决
1.Seata
注意:当前教程使用的seata版本为1.3.0,注册中心使用的Nacos 1.3.2。
1.1 官网
seata详细介绍请移步:seata官网
1.2 下载
安装包下载:seata-server-1.3.0.zip
源码包下载:Source code(zip)
1.3 通过安装包运行seata
1.3.1 解压seata-server-1.3.0.zip
# 解压
unzip seata-server-1.3.0.zip
# 进入seata根目录
cd seata-server
1.3.2 修改 conf/file.conf 配置文件
修改 conf/file.conf 配置文件,使seata信息存在在数据库中
# 编辑file.conf文件
vim conf/file.conf
file.conf修改内容如下
#①这里手动加入service模块
service {#transaction service group mapping#my_test_tx_group为自定义seata事务分组名称,可根据实际意义修改,也可以不改vgroup_mapping.my_test_tx_group = "default"#only support when registry.type=file, please don't set multiple addresses# 此seata的地址,如果seata在启动的时候指定了其他端口,此处一定要记得更改default.grouplist = "127.0.0.1:8901"#disable seatadisableGlobalTransaction = false
}
store {## store mode: file、db、redismode = "db" #②数据存储方式,默认为file,此处改成db,我们使用数据库来存储相关信息。## file store propertyfile {## store location dirdir = "sessionStore"# branch session size , if exceeded first try compress lockkey, still exceeded throws exceptionsmaxBranchSessionSize = 16384# globe session size , if exceeded throws exceptionsmaxGlobalSessionSize = 512# file buffer size , if exceeded allocate new bufferfileWriteBufferCacheSize = 16384# when recover batch read sizesessionReloadReadSize = 100# async, syncflushDiskMode = async}## database store propertydb {## the implement of javax.sql.DataSource, such as DruidDataSource(druid)/BasicDataSource(dbcp)/HikariDataSource(hikari) etc.datasource = "druid"## mysql/oracle/postgresql/h2/oceanbase etc.#③数据库类型dbType = "mysql" #④数据库驱动driverClassName = "com.mysql.jdbc.Driver" #⑤数据库地址url = "jdbc:mysql://192.168.0.30:3306/seata_1.3.0?useUnicode=true&characterEncoding=UTF-8&useSSL=false&serverTimezone=Asia/Shanghai" #⑥数据库用户名user = "root" #⑦数据库密码password = "123456" minConn = 5maxConn = 30globalTable = "global_table"branchTable = "branch_table"lockTable = "lock_table"queryLimit = 100maxWait = 5000}## redis store propertyredis {host = "127.0.0.1"port = "6379"password = ""database = "0"minConn = 1maxConn = 10queryLimit = 100}
}
注意:①②③④⑤⑥⑦处是需要修改的地方,不要改错了。
1.3.3 修改conf/registry.conf配置文件
修改conf/registry.conf配置文件,使seata注册到Nacos上。
#编辑配置文件
vim conf/registry.conf
registry.conf修改如下:
registry {# file 、nacos 、eureka、redis、zk、consul、etcd3、sofatype = "nacos" #①注册中心类型,默认为file,此处使用nacosnacos {application = "seata-server"serverAddr = "127.0.0.1:8848" #②nacos地址group = "SEATA_GROUP"namespace = "public" #③命名空间,可根据实际需要指定,此处使用publiccluster = "default"username = ""password = ""}config {# file、nacos 、apollo、zk、consul、etcd3type = "nacos" #④配置中心类型,默认是file,此处使用nacosnacos {serverAddr = "127.0.0.1:8848" #⑤nacos地址namespace = "public" #⑥命名空间,可根据实际需要指定,此处使用publicgroup = "SEATA_GROUP"username = ""password = ""}}
}
注意:①②③④⑤⑥处是需要修改的地方。
1.3.4 添加seata配置信息到nacos
- 在seata源码中复制seata-1.3.0/script/config-server/config.txt到seata安装包的根目录下。
- 在seata源码中复制seata-1.3.0/script/config-server/nacos/nacos-config.sh到seata安装包的conf目录下。
- 修改seata安装包下的config.txt文件,修改内容如下:
#seata地址,如果seata在启动的时候指定了其他端口,此处一定要记得更改(此地址只有在registry.type=file时生效)
service.default.grouplist=default
service.vgroup_mapping.my_test_tx_group=default
#数据库类型
store.db.dbType=mysql
#数据库驱动
store.db.driverClassName=com.mysql.jdbc.Driver
#数据库地址
store.db.url=jdbc:mysql://192.168.0.30:3306/seata_1.3.0?useUnicode=true&characterEncoding=UTF-8&useSSL=false&serverTimezone=Asia/Shanghai
#数据库用户名
store.db.user=root
#数据库密码
store.db.password=123456
- config.txt文件修改完成后,执行nacos-config.sh脚本向nacos推送配置信息。
#执行脚本,默认参数执行
sh config/nacos-config.sh
# 指定参数
sh nacos-config.sh -h 192.168.0.30 -p 8848 -g SEATA_GROUP -t d655c2a3-24d9-4eb4-a19b-0866d3d0b28f -u nacos -w nacossh nacos-config.sh -h 127.0.0.1 -p 8848 -g SEATA_GROUP -t 7708d8cf-661e-43b0-98ec-00445c83a077 -u nacos -w nacos
注:
-h -p 指定nacos的端口地址;
-g 指定配置的分组,是配置的分组;
-t 指定命名空间id;
-u -w指定nacos的用户名和密码,
同样,这里开启了nacos注册和配置认证的才需要指定。
注意:nacos必须先启动;config.txt必须在nacos-config.sh的上级目录下,不要放错位置了。
执行成功后提示:init nacos config finished, please start seata-server。
也可以进入nacos查看配置信息。
1.3.5 配置seata服务端数据库表结构
seatav1.0.0以下版本提供conf/db_store.sql脚本,v1.0.0(包括v1.0.0)版本起安装包不提供,此处埋下了一个坑。但是可以通过下载源码获取这些sql脚本(seata-/script/server/db目录下)。此处我贴出Mysql和Oracle的脚本,大家也不用到处去找了。
Mysql脚本:
-- -------------------------------- The script used when storeMode is 'db' --------------------------------
-- the table to store GlobalSession data
CREATE TABLE IF NOT EXISTS `global_table`
(`xid` VARCHAR(128) NOT NULL,`transaction_id` BIGINT,`status` TINYINT NOT NULL,`application_id` VARCHAR(32),`transaction_service_group` VARCHAR(32),`transaction_name` VARCHAR(128),`timeout` INT,`begin_time` BIGINT,`application_data` VARCHAR(2000),`gmt_create` DATETIME,`gmt_modified` DATETIME,PRIMARY KEY (`xid`),KEY `idx_gmt_modified_status` (`gmt_modified`, `status`),KEY `idx_transaction_id` (`transaction_id`)
) ENGINE = InnoDBDEFAULT CHARSET = utf8;-- the table to store BranchSession data
CREATE TABLE IF NOT EXISTS `branch_table`
(`branch_id` BIGINT NOT NULL,`xid` VARCHAR(128) NOT NULL,`transaction_id` BIGINT,`resource_group_id` VARCHAR(32),`resource_id` VARCHAR(256),`branch_type` VARCHAR(8),`status` TINYINT,`client_id` VARCHAR(64),`application_data` VARCHAR(2000),`gmt_create` DATETIME(6),`gmt_modified` DATETIME(6),PRIMARY KEY (`branch_id`),KEY `idx_xid` (`xid`)
) ENGINE = InnoDBDEFAULT CHARSET = utf8;-- the table to store lock data
CREATE TABLE IF NOT EXISTS `lock_table`
(`row_key` VARCHAR(128) NOT NULL,`xid` VARCHAR(96),`transaction_id` BIGINT,`branch_id` BIGINT NOT NULL,`resource_id` VARCHAR(256),`table_name` VARCHAR(32),`pk` VARCHAR(36),`gmt_create` DATETIME,`gmt_modified` DATETIME,PRIMARY KEY (`row_key`),KEY `idx_branch_id` (`branch_id`)
) ENGINE = InnoDBDEFAULT CHARSET = utf8;
Oracle脚本:
-- -------------------------------- The script used when storeMode is 'db' --------------------------------
-- the table to store GlobalSession data
CREATE TABLE global_table
(xid VARCHAR2(128) NOT NULL,transaction_id NUMBER(19),status NUMBER(3) NOT NULL,application_id VARCHAR2(32),transaction_service_group VARCHAR2(32),transaction_name VARCHAR2(128),timeout NUMBER(10),begin_time NUMBER(19),application_data VARCHAR2(2000),gmt_create TIMESTAMP(0),gmt_modified TIMESTAMP(0),PRIMARY KEY (xid)
);CREATE INDEX idx_gmt_modified_status ON global_table (gmt_modified, status);
CREATE INDEX idx_transaction_id ON global_table (transaction_id);-- the table to store BranchSession data
CREATE TABLE branch_table
(branch_id NUMBER(19) NOT NULL,xid VARCHAR2(128) NOT NULL,transaction_id NUMBER(19),resource_group_id VARCHAR2(32),resource_id VARCHAR2(256),branch_type VARCHAR2(8),status NUMBER(3),client_id VARCHAR2(64),application_data VARCHAR2(2000),gmt_create TIMESTAMP(6),gmt_modified TIMESTAMP(6),PRIMARY KEY (branch_id)
);CREATE INDEX idx_xid ON branch_table (xid);-- the table to store lock data
CREATE TABLE lock_table
(row_key VARCHAR2(128) NOT NULL,xid VARCHAR2(96),transaction_id NUMBER(19),branch_id NUMBER(19) NOT NULL,resource_id VARCHAR2(256),table_name VARCHAR2(32),pk VARCHAR2(36),gmt_create TIMESTAMP(0),gmt_modified TIMESTAMP(0),PRIMARY KEY (row_key)
);CREATE INDEX idx_branch_id ON lock_table (branch_id);
1.3.6 启动seata
Linux启动命令:
#Linux启动脚本 -p 指定seata的端口,默认8901
nohup sh seata-server.sh -p 8901 &
Windows启动命令:
#! windows启动脚本
seata-server.bat -p 8901
2.Spring Cloud整合Seata
2.1 pom.xml中添加seata依赖
<!--seata-->
<dependency><groupId>io.seata</groupId><artifactId>seata-spring-boot-starter</artifactId><version>1.3.0</version>
</dependency>
<dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-starter-alibaba-seata</artifactId><exclusions><exclusion><groupId>io.seata</groupId><artifactId>seata-spring-boot-starter</artifactId></exclusion></exclusions>
</dependency>
2.2 application.yml中添加seata配置信息
seata:tx-service-group: my_test_tx_group #此处配置自定义的seata事务分组名称config:type: nacos #配置中心,此处使用nacosnacos:server-addr: 127.0.0.1:8848 #配置中心地址namespace: defaultregistry:type: nacos #注册中心,此处使用nacosnacos:server-addr: 127.0.0.1:8848 #注册中心地址namespace: default
在seata.config.nacos和seata.registry.nacos的配置类中还有很多配置属性,例如:namespace、group、username、password,由于都是使用的默认的配置,所有就没有指定了,大家也可以根据自己的实际情况添加不同的配置信息。
2.3 配置代理数据源
/*** 向Spring容器中注入DruidConfiguration*/
@Configuration
public class DruidConfiguration {@Autowiredprivate Properties properties;@Bean//此处不能添加@RefreshScopepublic DruidDataSource dataSource() {DruidDataSource datasource = new DruidDataSource();//设置datasource的信信息,如:url、username、password.....return datasource;}/*** 代理数据源*/@Primary@Bean("dataSource")//动态刷新数据源在此添加 @RefreshScopepublic DataSourceProxy dataSourceProxy(DataSource druidDataSource){return new DataSourceProxy(druidDataSource);}
}
注意:
- 如果数据源上使用了@RefreshScope来动态刷新数据源,在添加代理数据源后@RefreshScope则不能写在DruidDataSource的bean上面,应该写在DataSourceProxy的bean上,不然会报错。
- 参与分布式事务的所有数据库操作,禁止使用批量插入和批量修改的SQL,批量操作将会导致sql异常或者事务无法回滚的问题。如果有批量操作的,就使用for循环单条处理。
2.4 在业务库中创建undo_log表结构
此表结构在源码中的seata-1.3.0/script/client/at/db目录下。
Mysql数据库:
-- for AT mode you must to init this sql for you business database. the seata server not need it.
CREATE TABLE IF NOT EXISTS `undo_log`
(`branch_id` BIGINT(20) NOT NULL COMMENT 'branch transaction id',`xid` VARCHAR(100) NOT NULL COMMENT 'global transaction id',`context` VARCHAR(128) NOT NULL COMMENT 'undo_log context,such as serialization',`rollback_info` LONGBLOB NOT NULL COMMENT 'rollback info',`log_status` INT(11) NOT NULL COMMENT '0:normal status,1:defense status',`log_created` DATETIME(6) NOT NULL COMMENT 'create datetime',`log_modified` DATETIME(6) NOT NULL COMMENT 'modify datetime',UNIQUE KEY `ux_undo_log` (`xid`, `branch_id`)
) ENGINE = InnoDBAUTO_INCREMENT = 1DEFAULT CHARSET = utf8 COMMENT ='AT transaction mode undo table';
Oracle数据库:
-- for AT mode you must to init this sql for you business database. the seata server not need it.
CREATE TABLE undo_log
(id NUMBER(19) NOT NULL,branch_id NUMBER(19) NOT NULL,xid VARCHAR2(100) NOT NULL,context VARCHAR2(128) NOT NULL,rollback_info BLOB NOT NULL,log_status NUMBER(10) NOT NULL,log_created TIMESTAMP(0) NOT NULL,log_modified TIMESTAMP(0) NOT NULL,PRIMARY KEY (id),CONSTRAINT ux_undo_log UNIQUE (xid, branch_id)
);
COMMENT ON TABLE undo_log IS 'AT transaction mode undo table';
-- Generate ID using sequence and trigger
CREATE SEQUENCE UNDO_LOG_SEQ START WITH 1 INCREMENT BY 1;
3.测试
seata分布式事务的实现对业务代码零侵入,当需要开启分布式事务的时候,只需要在事务发起方添加@GlobalTransactional(rollbackFor = Exception.class)即可。
案例:
//事务发起方
@Service
public class OrderService {/*** 模拟创建订单* @return 1/0 成功/失败*/@GlobalTransactional(rollbackFor = Exception.class)public int createOrder(){int result = 0;//订单创建int stock = 50; //订单库存double money = 50; //订单金额//1、创建订单,调用当前服务的mapperresult = orderMapper.createOrder(stock, money);//2、减库存--远程调用result = stockClient.reduceStock(stock);//3、扣余额--远程调用result = moneyClient.reduceMoney(money);return result;}
}
//事务参与方1-库存
@Service
public class StockService {/*** 减库存* @param stock 减少的库存* @return*/@Transactional(rollbackFor = Exception.class)public int reduceStock(int stock) {return stockMapper.reduceStock(stock);}
}
//事务参与方2-余额
@Service
public class MoneyService {/*** 减金额* @param money 金额* @return*/@Transactional(rollbackFor = Exception.class)public int reduceMoney(double money) {return moneyMapper.reduceMoney(money);}
}
注意:如果使用Feign远程调用服务,并且配置了fallback熔断器,远程调用服务失败不会抛出异常,而是进入到熔断器中,从而导致事务无法正常回滚。这种情况下,可在在fallback中手动回滚事务。如果大家有更好的解决方式,请告诉我一下,谢谢。
//手动回滚seata事务
@Component
public class StockClientFallBack implements StockClient {private Logger logger = LoggerFactory.getLogger(this.getClass());@Overridepublic int reduceStock(int stock) {logger.error(Thread.currentThread().getName() + ".reduceStock({})已进入熔断器", stock);try {//手动回滚全局事务GlobalTransactionContext.reload(RootContext.getXID()).rollback();}catch (TransactionException e){e.printStackTrace();}return 0;}
}
至此,Spring Cloud整合Seata实现分布式事务结束。
4.问题处理
4.1 springcloud之seata在微服务模块全局异常捕捉后导致事务不会滚方案解决
问题原因:当service A 调用Service B时,如果ServiceA报错,ServiceB能回滚,但是如果ServiceB报错,ServiceA是无法进行回滚的。
原因分析:经查找,出现此问题是因为系统做了全局异常处理,返回统一的异常信息个前端,这就导致当ServiceB报错抛出事务异常,但是此异常被全局异常类给处理了,就导致ServiceA无法感知ServiceB抛事务异常了,所以就导致ServiceA无法回滚。
解决方案:
方案一:既然ServiceB抛出的事务异常被统一异常处理了,那么在ServiceA中手动判断ServiceB接口是否请求成功,如果失败,则手动回滚,具体代码如下:
@GlobalTransactional(rollbackFor = Exception.class)
@Override
public int testTransactional() throws Exception {TestDemo demo = new TestDemo();demo.setId(idKeyGenerator.getId());demo.setName("张三");//1.调用本服务this.save(demo);//2.调用其他服务ResultData<Void> resultData = fileClient.add();//此处手动判断add请求是否成功,如果失败则手动回滚if (!resultData.getSuccess()) {//手动回滚全局事务GlobalTransactionContext.reload(RootContext.getXID()).rollback();return 0;}return 0;
}
方案二:通过查看seata的源码发现,当response响应错误时,就会抛出异常进行事务回滚,既然这样,那我们也可以在全局统一异常处理中去判断当前请求是否存在分布式事务,如果存在则直接把response的状态码设置为500即可,最后ServiceA就会判断出当前请求失败并进入熔断器,然后就执行我们的事务回滚代码了。
package com.yunling.file.config.exception.handler;import com.yunling.file.common.ResultData;
import com.yunling.file.common.ReturnCode;
import com.yunling.file.config.exception.BizException;
import com.yunling.file.config.exception.ParamValidateException;
import io.seata.core.context.RootContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.ObjectUtils;
import org.springframework.web.bind.annotation.ControllerAdvice;
import org.springframework.web.bind.annotation.ExceptionHandler;
import org.springframework.web.bind.annotation.ResponseBody;import javax.servlet.http.HttpServletResponse;/*** 自定义全局异常处理** @author 谭永强* @date 2022-03-11*/
@ControllerAdvice
public class GlobalExceptionHandler {private final Logger logger = LoggerFactory.getLogger(this.getClass());/*** 自定义异常(业务异常)** @param e 异常信息* @return 统一返回结果*/@ExceptionHandler(value = BizException.class)@ResponseBodypublic ResultData<String> bizExceptionHandler(BizException e, HttpServletResponse response) {logger.error("发生业务异常-->原因是:{}", e.getErrorMsg());setRespErrStatus(response);return ResultData.fail(e.getErrorCode(), e.getErrorMsg());}/*** 参数校验异常(业务异常)** @param e 异常信息* @return 统一返回结果*/@ExceptionHandler(value = ParamValidateException.class)@ResponseBodypublic ResultData<String> pvExceptionHandler(ParamValidateException e, HttpServletResponse response) {logger.error("参数校验异常-->原因是:{}", e.getErrorMsg());setRespErrStatus(response);return ResultData.fail(e.getErrorCode(), e.getErrorMsg());}/*** 其他异常处理** @param e 异常信息* @return 统一返回结果*/@ExceptionHandler(Exception.class)@ResponseBodypublic ResultData<String> exception(Exception e, HttpServletResponse response) {logger.error("全局异常信息 ex={}", e.getMessage(), e);setRespErrStatus(response);return ResultData.fail(ReturnCode.RC500.getCode(), e.getMessage());}/*** 如果开启分布式事务,就设置response.status = 500,seata的tm(事务管理器)就是感知到 TmTransactionException异常,发起事务回滚*/private void setRespErrStatus(HttpServletResponse response) {//如果开启分布式事务,设置错误状态码,让事务回滚if (!ObjectUtils.isEmpty(RootContext.getXID())) {response.setStatus(500);} else {response.setStatus(200);}}
}
这篇关于Spring Cloud整合Seata实现分布式事务的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!