TCC-transaction源码(二):事务恢复

2024-08-26 09:08

本文主要是介绍TCC-transaction源码(二):事务恢复,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

一、为什么需要恢复任务

为了处理异常。在TCC事务下,假如A服务调用B服务,B服务超过事务恢复的时间还没有返回,就要取消事务,进行回滚操作,不能让事务一直这么挂着不结束。或者还没等B结果返回A服务就挂了,重启A服务后的遗留事务需要恢复。或者是B返回成功/失败后,A服务执行二阶段的确认提交/回滚事务的方法时候失败了,如果没有异常处理,A的资源便无法释放,事务无法结束。

二、恢复任务初始化类

public class RecoverScheduledJob {private TransactionRecovery transactionRecovery;private TransactionConfigurator transactionConfigurator;private Scheduler scheduler;public void init() {try {MethodInvokingJobDetailFactoryBean jobDetail = new MethodInvokingJobDetailFactoryBean();// 任务执行的实体是TransactionRecovery类的startRecover方法jobDetail.setTargetObject(transactionRecovery);jobDetail.setTargetMethod("startRecover");jobDetail.setName("transactionRecoveryJob");// 任务不允许并发执行jobDetail.setConcurrent(false);jobDetail.afterPropertiesSet();CronTriggerFactoryBean cronTrigger = new CronTriggerFactoryBean();cronTrigger.setBeanName("transactionRecoveryCronTrigger");// 任务按配置的corn表达式定时执行  cronTrigger.setCronExpression(transactionConfigurator.getRecoverConfig().getCronExpression());cronTrigger.setJobDetail(jobDetail.getObject());cronTrigger.afterPropertiesSet();scheduler.scheduleJob(jobDetail.getObject(), cronTrigger.getObject());scheduler.start();} catch (Exception e) {throw new SystemException(e);}}// ......
}

SpringTransactionConfigurator是TCC事务的全局配置类,除了提供任务执行的corn表达式,任务其他相关属性外,还有事务异步执行的线程池等。

public class SpringTransactionConfigurator implements TransactionConfigurator {private static volatile ExecutorService executorService = null;@Autowiredprivate TransactionRepository transactionRepository;/***DefaultRecoverConfig提供了RecoverConfig接口的默认配置项,包括事务恢复任务执行corn表达式,事务      *恢复间隔时间,恢复最大执行次数; */@Autowired(required = false)private RecoverConfig recoverConfig = DefaultRecoverConfig.INSTANCE;private TransactionManager transactionManager;/***配置的线程池是给事务异步提交和异步回滚用的。*线程工厂创建的线程前缀是 tcc-async-terminate-pool- +线程递增数字 + -thread-,非守护线程,优      *先级5,拒绝策略为当线程池满时通过主线程运行任务*/public void init() {transactionManager = new TransactionManager();transactionManager.setTransactionRepository(transactionRepository);if (executorService == null) {Executors.defaultThreadFactory();synchronized (SpringTransactionConfigurator.class) {if (executorService == null) {executorService = new ThreadPoolExecutor(recoverConfig.getAsyncTerminateThreadCorePoolSize(),recoverConfig.getAsyncTerminateThreadMaxPoolSize(),5L,TimeUnit.SECONDS,new ArrayBlockingQueue<Runnable>(recoverConfig.getAsyncTerminateThreadWorkQueueSize()),new ThreadFactory() {final AtomicInteger poolNumber = new AtomicInteger(1);final ThreadGroup group;final AtomicInteger threadNumber = new AtomicInteger(1);final String namePrefix;{SecurityManager securityManager = System.getSecurityManager();this.group = securityManager != null ? securityManager.getThreadGroup() : Thread.currentThread().getThreadGroup();this.namePrefix = "tcc-async-terminate-pool-" + poolNumber.getAndIncrement() + "-thread-";}public Thread newThread(Runnable runnable) {Thread thread = new Thread(this.group, runnable, this.namePrefix + this.threadNumber.getAndIncrement(), 0L);if (thread.isDaemon()) {thread.setDaemon(false);}if (thread.getPriority() != 5) {thread.setPriority(5);}return thread;}},new ThreadPoolExecutor.CallerRunsPolicy());}}}transactionManager.setExecutorService(executorService);if (transactionRepository instanceof CachableTransactionRepository) {((CachableTransactionRepository) transactionRepository).setExpireDuration(recoverConfig.getRecoverDuration());}}// ...
}

DefaultRecoverConfig

public class DefaultRecoverConfig implements RecoverConfig {public static final RecoverConfig INSTANCE = new DefaultRecoverConfig();private int maxRetryCount = 30;private int recoverDuration = 120; //120 secondsprivate String cronExpression = "0 */1 * * * ?";private int asyncTerminateThreadCorePoolSize = 512;private int asyncTerminateThreadMaxPoolSize = 1024;private int asyncTerminateThreadWorkQueueSize = 512;private Set<Class<? extends Exception>> delayCancelExceptions = new HashSet<Class<? extends Exception>>();// 当事务内发生OptimisticLockException和SocketTimeoutException异常的时候,不会立即回滚事务,// 预先加载这两个异常public DefaultRecoverConfig() {delayCancelExceptions.add(OptimisticLockException.class);delayCancelExceptions.add(SocketTimeoutException.class);}

三、为什么发生SocketTimeoutException和OptimisticLockException异常时不回滚

  • SocketTimeoutException

    作者说: 不立即回滚,主要考虑是被调用服务方存在一直在正常执行的可能,只是执行的慢,导致了调用方超时,此时如果立即回滚,在被调用方执行cancel操作的同时,被调用方的try方法还在执行,甚至cancel操作执行完了,try方法还没结束,这种情况下业务数据存在不一致的可能。目前解决办法是这类异常不立即回滚,而是由恢复job执行回滚,恢复job会在一段时间后再去调用该被调用方的cancel方法,这个时间可在RecoverConfig中设置,默认120s。

    也就是说try方法还没执行完的时候,就去执行cancel方法,如果try在cancel之后执行完,那就没法恢复了

  • OptimisticLockException

    整个TCC框架中,抛出这个异常的地方只有一处,在org.mengyun.tcctransaction.repository.CachableTransactionRepository#update 方法中,也就是更新事务状态的时候,如果被更新的条数为0,表示这条记录不存在或者记录存在但无法被更新时,抛出这个异常。

    出现这个情况的场景有:

    1. A服务调用B服务时创建事务记录,当B很久还没有返回的时候,事务已经到了恢复的时间间隔,A就会取消事务,删除事务记录(B作为事务参与者,取消方法也会被A调用)。之后B再返回的时候,无论返回成功A提交,还是返回失败A回滚,这条事务记录都已经不存在了,就会抛异常。此时已经没有什么好回滚的了,所以忽略这个异常。

    2. A服务调用B服务时,在真正调用B的方法之前,需要

      (1)、创建根事务记录;

      (2)、更新根事务的参与者A;

      (3)、更新根事务的参与者B;

      (4)、。。。。。。如果事务很长,有更多的参与者,则需要诸葛加入事务参与者队列

      在这个过程中如果事务就到了恢复间隔,执行回滚删除事务记录,那后面加入事务参与者队列的更新操作,都会抛OptimisticLockException异常。

    场景1是最常见的抛出OptimisticLockException异常的场景,所以一定要配置事务恢复间隔大于服务调用超时时间。这也符合任务补偿的设计理念,任务是做补偿用的,应该在主干流程执行结束后执行,而不是提前执行。场景2出现的概率比较小,毕竟协调事务参与者是用不了多少时间的。

四、恢复任务执行体TransactionRecovery

public class TransactionRecovery {private TransactionConfigurator transactionConfigurator;// 恢复任务执行的方法public void startRecover() {List<Transaction> transactions = loadErrorTransactions();recoverErrorTransactions(transactions);}private List<Transaction> loadErrorTransactions() {long currentTimeInMillis = Calendar.getInstance().getTimeInMillis();TransactionRepository transactionRepository = transactionConfigurator.getTransactionRepository();RecoverConfig recoverConfig = transactionConfigurator.getRecoverConfig();/***获取所有超过了恢复期限的事务,一直跟进方法可以看到是获取事务记录表中* LAST_UPDATE_TIME < 当前时间 - 恢复间隔的所有记录*/return transactionRepository.findAllUnmodifiedSince(new Date(currentTimeInMillis - recoverConfig.getRecoverDuration() * 1000));}private void recoverErrorTransactions(List<Transaction> transactions) {for (Transaction transaction : transactions) {/*** 如果恢复任务执行次数超过了设置的最大恢复次数,打日志跳过这个事务,此时只能人为干预*/if (transaction.getRetriedCount() > transactionConfigurator.getRecoverConfig().getMaxRetryCount()) {logger.error(String.format("recover failed with max retry count,will not try again. txid:%s, status:%s,retried count:%d,transaction content:%s", transaction.getXid(), transaction.getStatus().getId(), transaction.getRetriedCount(), JSON.toJSONString(transaction)));continue;}// 如果是分支事务,并且当前时间 > 事务发生时间 + 最大恢复次数 * 恢复间隔,也不再执行恢复if (transaction.getTransactionType().equals(TransactionType.BRANCH)&& (transaction.getCreateTime().getTime() +transactionConfigurator.getRecoverConfig().getMaxRetryCount() *transactionConfigurator.getRecoverConfig().getRecoverDuration() * 1000> System.currentTimeMillis())) {continue;}try {// 重试次数+1transaction.addRetriedCount();// 如果事务状态是CONFIRMING,提交事务if (transaction.getStatus().equals(TransactionStatus.CONFIRMING)) {transaction.changeStatus(TransactionStatus.CONFIRMING);transactionConfigurator.getTransactionRepository().update(transaction);transaction.commit();// 提交成功后删除事务记录transactionConfigurator.getTransactionRepository().delete(transaction);// 如果事务状态是CANCELLING,执行回滚} else if (transaction.getStatus().equals(TransactionStatus.CANCELLING)|| transaction.getTransactionType().equals(TransactionType.ROOT)) {transaction.changeStatus(TransactionStatus.CANCELLING);transactionConfigurator.getTransactionRepository().update(transaction);transaction.rollback();// 回滚成功后删除事务记录 transactionConfigurator.getTransactionRepository().delete(transaction);}} catch (Throwable throwable) {if (throwable instanceof OptimisticLockException|| ExceptionUtils.getRootCause(throwable) instanceof OptimisticLockException) {logger.warn(String.format("optimisticLockException happened while recover. txid:%s, status:%s,retried count:%d,transaction content:%s", transaction.getXid(), transaction.getStatus().getId(), transaction.getRetriedCount(), JSON.toJSONString(transaction)), throwable);} else {logger.error(String.format("recover failed, txid:%s, status:%s,retried count:%d,transaction content:%s", transaction.getXid(), transaction.getStatus().getId(), transaction.getRetriedCount(), JSON.toJSONString(transaction)), throwable);}}}}public void setTransactionConfigurator(TransactionConfigurator transactionConfigurator) {this.transactionConfigurator = transactionConfigurator;}
}

这篇关于TCC-transaction源码(二):事务恢复的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1108143

相关文章

使用SecondaryNameNode恢复NameNode的数据

1)需求: NameNode进程挂了并且存储的数据也丢失了,如何恢复NameNode 此种方式恢复的数据可能存在小部分数据的丢失。 2)故障模拟 (1)kill -9 NameNode进程 [lytfly@hadoop102 current]$ kill -9 19886 (2)删除NameNode存储的数据(/opt/module/hadoop-3.1.4/data/tmp/dfs/na

电脑桌面文件删除了怎么找回来?别急,快速恢复攻略在此

在日常使用电脑的过程中,我们经常会遇到这样的情况:一不小心,桌面上的某个重要文件被删除了。这时,大多数人可能会感到惊慌失措,不知所措。 其实,不必过于担心,因为有很多方法可以帮助我们找回被删除的桌面文件。下面,就让我们一起来了解一下这些恢复桌面文件的方法吧。 一、使用撤销操作 如果我们刚刚删除了桌面上的文件,并且还没有进行其他操作,那么可以尝试使用撤销操作来恢复文件。在键盘上同时按下“C

JAVA智听未来一站式有声阅读平台听书系统小程序源码

智听未来,一站式有声阅读平台听书系统 🌟&nbsp;开篇:遇见未来,从“智听”开始 在这个快节奏的时代,你是否渴望在忙碌的间隙,找到一片属于自己的宁静角落?是否梦想着能随时随地,沉浸在知识的海洋,或是故事的奇幻世界里?今天,就让我带你一起探索“智听未来”——这一站式有声阅读平台听书系统,它正悄悄改变着我们的阅读方式,让未来触手可及! 📚&nbsp;第一站:海量资源,应有尽有 走进“智听

Java ArrayList扩容机制 (源码解读)

结论:初始长度为10,若所需长度小于1.5倍原长度,则按照1.5倍扩容。若不够用则按照所需长度扩容。 一. 明确类内部重要变量含义         1:数组默认长度         2:这是一个共享的空数组实例,用于明确创建长度为0时的ArrayList ,比如通过 new ArrayList<>(0),ArrayList 内部的数组 elementData 会指向这个 EMPTY_EL

如何在Visual Studio中调试.NET源码

今天偶然在看别人代码时,发现在他的代码里使用了Any判断List<T>是否为空。 我一般的做法是先判断是否为null,再判断Count。 看了一下Count的源码如下: 1 [__DynamicallyInvokable]2 public int Count3 {4 [__DynamicallyInvokable]5 get

工厂ERP管理系统实现源码(JAVA)

工厂进销存管理系统是一个集采购管理、仓库管理、生产管理和销售管理于一体的综合解决方案。该系统旨在帮助企业优化流程、提高效率、降低成本,并实时掌握各环节的运营状况。 在采购管理方面,系统能够处理采购订单、供应商管理和采购入库等流程,确保采购过程的透明和高效。仓库管理方面,实现库存的精准管理,包括入库、出库、盘点等操作,确保库存数据的准确性和实时性。 生产管理模块则涵盖了生产计划制定、物料需求计划、

Spring 源码解读:自定义实现Bean定义的注册与解析

引言 在Spring框架中,Bean的注册与解析是整个依赖注入流程的核心步骤。通过Bean定义,Spring容器知道如何创建、配置和管理每个Bean实例。本篇文章将通过实现一个简化版的Bean定义注册与解析机制,帮助你理解Spring框架背后的设计逻辑。我们还将对比Spring中的BeanDefinition和BeanDefinitionRegistry,以全面掌握Bean注册和解析的核心原理。

音视频入门基础:WAV专题(10)——FFmpeg源码中计算WAV音频文件每个packet的pts、dts的实现

一、引言 从文章《音视频入门基础:WAV专题(6)——通过FFprobe显示WAV音频文件每个数据包的信息》中我们可以知道,通过FFprobe命令可以打印WAV音频文件每个packet(也称为数据包或多媒体包)的信息,这些信息包含该packet的pts、dts: 打印出来的“pts”实际是AVPacket结构体中的成员变量pts,是以AVStream->time_base为单位的显

kubelet组件的启动流程源码分析

概述 摘要: 本文将总结kubelet的作用以及原理,在有一定基础认识的前提下,通过阅读kubelet源码,对kubelet组件的启动流程进行分析。 正文 kubelet的作用 这里对kubelet的作用做一个简单总结。 节点管理 节点的注册 节点状态更新 容器管理(pod生命周期管理) 监听apiserver的容器事件 容器的创建、删除(CRI) 容器的网络的创建与删除

MySql 事务练习

事务(transaction) -- 事务 transaction-- 事务是一组操作的集合,是一个不可分割的工作单位,事务会将所有的操作作为一个整体一起向系统提交或撤销请求-- 事务的操作要么同时成功,要么同时失败-- MySql的事务默认是自动提交的,当执行一个DML语句,MySql会立即自动隐式提交事务-- 常见案例:银行转账-- 逻辑:A给B转账1000:1.查询