支付宝一面:多线程事务怎么回滚?说用 @Transactional 可以回去等通知了!

本文主要是介绍支付宝一面:多线程事务怎么回滚?说用 @Transactional 可以回去等通知了!,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

背景介绍

1.最近有一个大数据量插入的操作入库的业务场景,需要先做一些其他修改操作,然后在执行插入操作,由于插入数据可能会很多,用到多线程去拆分数据并行处理来提高响应时间,如果有一个线程执行失败,则全部回滚。

2.在spring中可以使用@Transactional注解去控制事务,使出现异常时会进行回滚,在多线程中,这个注解则不会生效,如果主线程需要先执行一些修改数据库的操作,当子线程在进行处理出现异常时,主线程修改的数据则不会回滚,导致数据错误。

3.下面用一个简单示例演示多线程事务。

公用的类和方法

/*** 平均拆分list方法.* @param source* @param n* @param <T>* @return*/
public static <T> List<List<T>> averageAssign(List<T> source,int n){List<List<T>> result=new ArrayList<List<T>>();int remaider=source.size()%n; int number=source.size()/n; int offset=0;//偏移量for(int i=0;i<n;i++){List<T> value=null;if(remaider>0){value=source.subList(i*number+offset, (i+1)*number+offset+1);remaider--;offset++;}else{value=source.subList(i*number+offset, (i+1)*number+offset);}result.add(value);}return result;
}
/**  线程池配置* @version V1.0*/
public class ExecutorConfig {private static int maxPoolSize = Runtime.getRuntime().availableProcessors();private volatile static ExecutorService executorService;public static ExecutorService getThreadPool() {if (executorService == null){synchronized (ExecutorConfig.class){if (executorService == null){executorService =  newThreadPool();}}}return executorService;}private static  ExecutorService newThreadPool(){int queueSize = 500;int corePool = Math.min(5, maxPoolSize);return new ThreadPoolExecutor(corePool, maxPoolSize, 10000L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<>(queueSize),new ThreadPoolExecutor.AbortPolicy());}private ExecutorConfig(){}
}
/** 获取sqlSession* @author 86182* @version V1.0*/
@Component
public class SqlContext {@Resourceprivate SqlSessionTemplate sqlSessionTemplate;public SqlSession getSqlSession(){SqlSessionFactory sqlSessionFactory = sqlSessionTemplate.getSqlSessionFactory();return sqlSessionFactory.openSession();}
}

示例事务不成功操作

/*** 测试多线程事务.* @param employeeDOList*/
@Override
@Transactional
public void saveThread(List<EmployeeDO> employeeDOList) {try {//先做删除操作,如果子线程出现异常,此操作不会回滚this.getBaseMapper().delete(null);//获取线程池ExecutorService service = ExecutorConfig.getThreadPool();//拆分数据,拆分5份List<List<EmployeeDO>> lists=averageAssign(employeeDOList, 5);//执行的线程Thread []threadArray = new Thread[lists.size()];//监控子线程执行完毕,再执行主线程,要不然会导致主线程关闭,子线程也会随着关闭CountDownLatch countDownLatch = new CountDownLatch(lists.size());AtomicBoolean atomicBoolean = new AtomicBoolean(true);for (int i =0;i<lists.size();i++){if (i==lists.size()-1){atomicBoolean.set(false);}List<EmployeeDO> list  = lists.get(i);threadArray[i] =  new Thread(() -> {try {//最后一个线程抛出异常if (!atomicBoolean.get()){throw new ServiceException("001","出现异常");}//批量添加,mybatisPlus中自带的batch方法this.saveBatch(list);}finally {countDownLatch.countDown();}});}for (int i = 0; i <lists.size(); i++){service.execute(threadArray[i]);}//当子线程执行完毕时,主线程再往下执行countDownLatch.await();System.out.println("添加完毕");}catch (Exception e){log.info("error",e);throw new ServiceException("002","出现异常");}finally {connection.close();}
}

数据库中存在一条数据。

//测试用例
@RunWith(SpringRunner.class)
@SpringBootTest(classes = { ThreadTest01.class, MainApplication.class})
public class ThreadTest01 {@Resourceprivate EmployeeBO employeeBO;/***   测试多线程事务.* @throws InterruptedException*/@Testpublic  void MoreThreadTest2() throws InterruptedException {int size = 10;List<EmployeeDO> employeeDOList = new ArrayList<>(size);for (int i = 0; i<size;i++){EmployeeDO employeeDO = new EmployeeDO();employeeDO.setEmployeeName("lol"+i);employeeDO.setAge(18);employeeDO.setGender(1);employeeDO.setIdNumber(i+"XX");employeeDO.setCreatTime(Calendar.getInstance().getTime());employeeDOList.add(employeeDO);}try {employeeBO.saveThread(employeeDOList);System.out.println("添加成功");}catch (Exception e){e.printStackTrace();}}
}

测试结果

可以发现子线程组执行时,有一个线程执行失败,其他线程也会抛出异常,但是主线程中执行的删除操作,没有回滚,Transactional注解没有生效。

使用sqlSession控制手动提交事务

@Resource
SqlContext sqlContext;/*** 测试多线程事务.* @param employeeDOList*/
@Override
public void saveThread(List<EmployeeDO> employeeDOList) throws SQLException {// 获取数据库连接,获取会话(内部自有事务)SqlSession sqlSession = sqlContext.getSqlSession();Connection connection = sqlSession.getConnection();try {// 设置手动提交connection.setAutoCommit(false);//获取mapperEmployeeMapper employeeMapper = sqlSession.getMapper(EmployeeMapper.class);//先做删除操作employeeMapper.delete(null);//获取执行器ExecutorService service = ExecutorConfig.getThreadPool();List<Callable<Integer>> callableList  = new ArrayList<>();//拆分listList<List<EmployeeDO>> lists=averageAssign(employeeDOList, 5);AtomicBoolean atomicBoolean = new AtomicBoolean(true);for (int i =0;i<lists.size();i++){if (i==lists.size()-1){atomicBoolean.set(false);}List<EmployeeDO> list  = lists.get(i);//使用返回结果的callable去执行,Callable<Integer> callable = () -> {//让最后一个线程抛出异常if (!atomicBoolean.get()){throw new ServiceException("001","出现异常");}return employeeMapper.saveBatch(list);};callableList.add(callable);}//执行子线程List<Future<Integer>> futures = service.invokeAll(callableList);for (Future<Integer> future:futures) {//如果有一个执行不成功,则全部回滚if (future.get()<=0){connection.rollback();return;}}connection.commit();System.out.println("添加完毕");}catch (Exception e){connection.rollback();log.info("error",e);throw new ServiceException("002","出现异常");}finally {connection.close();}
}
// sql
<insert id="saveBatch" parameterType="List">INSERT INTOemployee (employee_id,age,employee_name,birth_date,gender,id_number,creat_time,update_time,status)values<foreach collection="list" item="item" index="index" separator=",">(#{item.employeeId},#{item.age},#{item.employeeName},#{item.birthDate},#{item.gender},#{item.idNumber},#{item.creatTime},#{item.updateTime},#{item.status})</foreach></insert>

数据库中一条数据。

测试结果,抛出异常。

删除操作的数据回滚了,数据库中的数据依旧存在,说明事务成功了。

成功操作示例:
 @Resource
SqlContext sqlContext;
/*** 测试多线程事务.* @param employeeDOList*/
@Override
public void saveThread(List<EmployeeDO> employeeDOList) throws SQLException {// 获取数据库连接,获取会话(内部自有事务)SqlSession sqlSession = sqlContext.getSqlSession();Connection connection = sqlSession.getConnection();try {// 设置手动提交connection.setAutoCommit(false);EmployeeMapper employeeMapper = sqlSession.getMapper(EmployeeMapper.class);//先做删除操作employeeMapper.delete(null);ExecutorService service = ExecutorConfig.getThreadPool();List<Callable<Integer>> callableList  = new ArrayList<>();List<List<EmployeeDO>> lists=averageAssign(employeeDOList, 5);for (int i =0;i<lists.size();i++){List<EmployeeDO> list  = lists.get(i);Callable<Integer> callable = () -> employeeMapper.saveBatch(list);callableList.add(callable);}//执行子线程List<Future<Integer>> futures = service.invokeAll(callableList);for (Future<Integer> future:futures) {if (future.get()<=0){connection.rollback();return;}}connection.commit();System.out.println("添加完毕");}catch (Exception e){connection.rollback();log.info("error",e);throw new ServiceException("002","出现异常");// throw new ServiceException(ExceptionCodeEnum.EMPLOYEE_SAVE_OR_UPDATE_ERROR);}
}

测试结果。

数据库中数据。

删除的删除了,添加的添加成功了,测试成功。

这篇关于支付宝一面:多线程事务怎么回滚?说用 @Transactional 可以回去等通知了!的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/872724

相关文章

怎么关闭Ubuntu无人值守升级? Ubuntu禁止自动更新的技巧

《怎么关闭Ubuntu无人值守升级?Ubuntu禁止自动更新的技巧》UbuntuLinux系统禁止自动更新的时候,提示“无人值守升级在关机期间,请不要关闭计算机进程”,该怎么解决这个问题?详细请看... 本教程教你如何处理无人值守的升级,即 Ubuntu linux 的自动系统更新。来源:https://

Ubuntu系统怎么安装Warp? 新一代AI 终端神器安装使用方法

《Ubuntu系统怎么安装Warp?新一代AI终端神器安装使用方法》Warp是一款使用Rust开发的现代化AI终端工具,该怎么再Ubuntu系统中安装使用呢?下面我们就来看看详细教程... Warp Terminal 是一款使用 Rust 开发的现代化「AI 终端」工具。最初它只支持 MACOS,但在 20

LinuxMint怎么安装? Linux Mint22下载安装图文教程

《LinuxMint怎么安装?LinuxMint22下载安装图文教程》LinuxMint22发布以后,有很多新功能,很多朋友想要下载并安装,该怎么操作呢?下面我们就来看看详细安装指南... linux Mint 是一款基于 Ubuntu 的流行发行版,凭借其现代、精致、易于使用的特性,深受小伙伴们所喜爱。对

macOS怎么轻松更换App图标? Mac电脑图标更换指南

《macOS怎么轻松更换App图标?Mac电脑图标更换指南》想要给你的Mac电脑按照自己的喜好来更换App图标?其实非常简单,只需要两步就能搞定,下面我来详细讲解一下... 虽然 MACOS 的个性化定制选项已经「缩水」,不如早期版本那么丰富,www.chinasem.cn但我们仍然可以按照自己的喜好来更换

Redis事务与数据持久化方式

《Redis事务与数据持久化方式》该文档主要介绍了Redis事务和持久化机制,事务通过将多个命令打包执行,而持久化则通过快照(RDB)和追加式文件(AOF)两种方式将内存数据保存到磁盘,以防止数据丢失... 目录一、Redis 事务1.1 事务本质1.2 数据库事务与redis事务1.2.1 数据库事务1.

Ubuntu 怎么启用 Universe 和 Multiverse 软件源?

《Ubuntu怎么启用Universe和Multiverse软件源?》在Ubuntu中,软件源是用于获取和安装软件的服务器,通过设置和管理软件源,您可以确保系统能够从可靠的来源获取最新的软件... Ubuntu 是一款广受认可且声誉良好的开源操作系统,允许用户通过其庞大的软件包来定制和增强计算体验。这些软件

Ubuntu 24.04 LTS怎么关闭 Ubuntu Pro 更新提示弹窗?

《Ubuntu24.04LTS怎么关闭UbuntuPro更新提示弹窗?》Ubuntu每次开机都会弹窗提示安全更新,设置里最多只能取消自动下载,自动更新,但无法做到直接让自动更新的弹窗不出现,... 如果你正在使用 Ubuntu 24.04 LTS,可能会注意到——在使用「软件更新器」或运行 APT 命令时,

TP-LINK/水星和hasivo交换机怎么选? 三款网管交换机系统功能对比

《TP-LINK/水星和hasivo交换机怎么选?三款网管交换机系统功能对比》今天选了三款都是”8+1″的2.5G网管交换机,分别是TP-LINK水星和hasivo交换机,该怎么选呢?这些交换机功... TP-LINK、水星和hasivo这三台交换机都是”8+1″的2.5G网管交换机,我手里的China编程has

SpringBoot嵌套事务详解及失效解决方案

《SpringBoot嵌套事务详解及失效解决方案》在复杂的业务场景中,嵌套事务可以帮助我们更加精细地控制数据的一致性,然而,在SpringBoot中,如果嵌套事务的配置不当,可能会导致事务不生效的问题... 目录什么是嵌套事务?嵌套事务失效的原因核心问题:嵌套事务的解决方案方案一:将嵌套事务方法提取到独立类

AI绘图怎么变现?想做点副业的小白必看!

在科技飞速发展的今天,AI绘图作为一种新兴技术,不仅改变了艺术创作的方式,也为创作者提供了多种变现途径。本文将详细探讨几种常见的AI绘图变现方式,帮助创作者更好地利用这一技术实现经济收益。 更多实操教程和AI绘画工具,可以扫描下方,免费获取 定制服务:个性化的创意商机 个性化定制 AI绘图技术能够根据用户需求生成个性化的头像、壁纸、插画等作品。例如,姓氏头像在电商平台上非常受欢迎,