Narayana-XA事务恢复(5)

2023-11-23 05:32
文章标签 恢复 事务 xa narayana

本文主要是介绍Narayana-XA事务恢复(5),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

Narayana-XA事务恢复(5)

说事务恢复流程之前,我们来讨论下,会啥会出现事务恢复?XA二阶段提交协议不是强一致性的吗?要解答这个问题,我们就要来看看XA二阶段协议有什么问题?

问题一 :单点故障

由于协调者的重要性,一旦协调者TM发生故障。参与者RM会一直阻塞下去。尤其在第二阶段,协调者发生故障,那么所有的参与者还都处于锁定事务资源的状态中,而无法继续完成事务操作。(如果是协调者挂掉,可以重新选举一个协调者,但是无法解决因为协调者宕机导致的参与者处于阻塞状态的问题)

问题二 :数据不一致

数据不一致。在二阶段提交的阶段二中,当协调者向参与者发送commit请求之后,发生了局部网络异常或者在发送commit请求过程中协调者发生了故障,这会导致只有一部分参与者接受到了commit请求。而在这部分参与者接到commit请求之后就会执行commit操作。但是其他部分未接到commit请求的机器则无法执行事务提交。于是整个分布式系统便出现了数据不一致性的现象。

如何解决?

解决的方案简单,就是我们在事务的操作的每一步,我们都需要对事务状态的日志进行人为的记录,我们可以把日志记录存储在我们想存储的地方,可以是本地存储,也可以中心化的存储。Narayana的开源版本,提供了filedb 2种方式存储,file只能支持单机环境,而db是可以支持集群环境。

Narayana 事务恢复流程。

Narayana使用了单线程轮询RM,执行XA recovery语句,来判断是否有需要恢复的语句。

具体的代码 com.arjuna.ats.internal.arjuna.recovery.PeriodicRecovery.run() 方法。以下是代码:

 public void run (){doInitialWait();boolean finished = false;do{boolean workToDo = false;// ok, get to the point where we are ready to start a scansynchronized(_stateLock) {if (getStatus() == Status.SCANNING) {// need to wait for some other scan to finishif (tsLogger.logger.isDebugEnabled()) {tsLogger.logger.debug("PeriodicRecovery: background thread waiting on other scan");}doScanningWait();// we don't wait around if a worker scan request has just come inif (getMode() == Mode.ENABLED && !_workerScanRequested) {// the last guy just finished scanning so we ought to wait a bit rather than just// pile straight in to do some workif (tsLogger.logger.isDebugEnabled()) {tsLogger.logger.debug("PeriodicRecovery: background thread backing off");}doPeriodicWait();// if we got told to stop then do sofinished = (getMode() == Mode.TERMINATED);}} else {// status == INACTIVE so we can go ahead and scan if scanning is enabledswitch (getMode()) {case ENABLED:// ok grab our chance to be the scanning threadif (tsLogger.logger.isDebugEnabled()) {tsLogger.logger.debug("PeriodicRecovery: background thread Status <== SCANNING");}setStatus(Status.SCANNING);// must kick any other waiting threads_stateLock.notifyAll();workToDo = true;break;case SUSPENDED:// we need to wait while we are suspendedif (tsLogger.logger.isDebugEnabled()) {tsLogger.logger.debug("PeriodicRecovery: background thread wait while SUSPENDED");}doSuspendedWait();// we come out of here with the lock and either ENABLED or TERMINATEDfinished = (getMode() == Mode.TERMINATED);break;case TERMINATED:finished = true;break;}}}// its ok to start work if requested -- we cannot be stopped now by a mode change to SUSPEND// or TERMINATE until we get through phase 1 and maybe phase 2 if we are luckyif (workToDo) {// ok it is now this thread's turn to run a scan. before starting we check if there is a// worker waiting and reset the waiting flag. we will check again after the scan has// completed to see if a worker request has come in after starting this scan.// if so we avoid notifying the worker ensuring a requst is only confirmed when a// full scan has happened afetr the request was madeboolean notifyRequired;synchronized(_stateLock) {notifyRequired = _workerScanRequested;_workerScanRequested = false;}// we are in state SCANNING so actually do the scanif (tsLogger.logger.isDebugEnabled()) {tsLogger.logger.debug("PeriodicRecovery: background thread scanning");}doWorkInternal();// clear the SCANNING state now we have donesynchronized(_stateLock) {if (tsLogger.logger.isDebugEnabled()) {tsLogger.logger.debug("PeriodicRecovery: background thread Status <== INACTIVE");}setStatus(Status.INACTIVE);// must kick any other waiting threads_stateLock.notifyAll();// check if we need to notify a listener worker that we just finished  a scanif (notifyRequired && !_workerScanRequested) {notifyWorker();}if (getMode() == Mode.ENABLED && !_workerScanRequested) {// we managed a full scan and scanning is still enabled// so wait a bit before the next attemptif (tsLogger.logger.isDebugEnabled()) {tsLogger.logger.debug("PeriodicRecovery: background thread backing off");}doPeriodicWait();}finished = (getMode() == Mode.TERMINATED);}}} while (!finished);// make sure the worker thread is not wedged waiting for a scan to completesynchronized(_stateLock) {if (_workerScanRequested) {notifyWorker();}}if (tsLogger.logger.isDebugEnabled()) {tsLogger.logger.debug("PeriodicRecovery: background thread exiting");}}
  • 别被吓到了,我们重点来关注 doWorkInternal(); 我们来看看这个方法。

         //获取所有的RecoveryModule ,然后一个一个执行Vector copyOfModules = getModules();Enumeration modules = copyOfModules.elements();while (modules.hasMoreElements()){RecoveryModule m = (RecoveryModule) modules.nextElement();// we need to ensure we use the class loader context of the recovery module while we are executing// its methodsClassLoader cl = switchClassLoader(m);try {m.periodicWorkFirstPass();} finally {restoreClassLoader(cl);}if (tsLogger.logger.isDebugEnabled()) {tsLogger.logger.debug(" ");}}// take the lock again so we can do a backoff wait on itsynchronized (_stateLock) {// we have to wait for a bit to avoid catching (too many)// transactions etc. that are really progressing quite happilydoBackoffWait();// we carry on scanning even if scanning is SUSPENDED because the suspending thread// might be waiting on us to complete and we don't want to risk deadlocking it by waiting// here for a resume.// if we have been TERMINATED we bail out now// n.b. if we give up here the caller is responsible for clearing the active scanif (getMode() == Mode.TERMINATED) {if (tsLogger.logger.isDebugEnabled()) {tsLogger.logger.debug("PeriodicRecovery: scan TERMINATED at phase 1");}return;}}// move on to phase 2if (tsLogger.logger.isDebugEnabled()) {tsLogger.logger.debug("Periodic recovery second pass at "+_theTimestamper.format(new Date()));}modules = copyOfModules.elements();while (modules.hasMoreElements()){RecoveryModule m = (RecoveryModule) modules.nextElement();ClassLoader cl = switchClassLoader(m);try {m.periodicWorkSecondPass();} finally {restoreClassLoader(cl);}if (tsLogger.logger.isDebugEnabled()) {tsLogger.logger.debugf("PeriodicRecovery: recovery module '%s' second pass processed", m);}}
  • 首先会获取框架所有的RecoveryModule类,然后一个一个执行,我们先来看看这个类:

public interface RecoveryModule
{/*** Called by the RecoveryManager at start up, and then* PERIODIC_RECOVERY_PERIOD seconds after the completion, for all RecoveryModules,* of the second pass*/public void periodicWorkFirstPass ();/*** Called by the RecoveryManager RECOVERY_BACKOFF_PERIOD seconds* after the completion of the first pass*/public void periodicWorkSecondPass ();
}

RecoveryModule的实现类有 XARecoveryModule ,AtomicActionRecoveryModule,SubordinateAtomicActionRecoveryModule,CommitMarkableResourceRecordRecoveryModule。等4个实现类。

恢复执行第一个阶段
  • XARecoveryModule :
    它的作用就是执行XA recovery 命令从RM,获取 Xid数组。然后缓存起来。核心代码为:

//从数据库获取
trans = xares.recover(XAResource.TMSTARTRSCAN);
//缓存刷新refreshXidScansForEquivalentXAResourceImpl(xares, trans);
  • AtomicActionRecoveryModule:
    从事务日志里面获取需要恢复的UID,具体代码为:

  // Transaction typeboolean AtomicActions = false ;// uids per transaction typeInputObjectState aa_uids = new InputObjectState() ;try{if (tsLogger.logger.isDebugEnabled()) {tsLogger.logger.debug("AtomicActionRecoveryModule first pass");}AtomicActions = _recoveryStore.allObjUids( _transactionType, aa_uids );}catch ( ObjectStoreException ex ) {tsLogger.i18NLogger.warn_recovery_AtomicActionRecoveryModule_1(ex);}if ( AtomicActions ){_transactionUidVector = processTransactions( aa_uids ) ;}
恢复执行第二个阶段

首先执行的代码为 :

 // move on to phase 2if (tsLogger.logger.isDebugEnabled()) {tsLogger.logger.debug("Periodic recovery second pass at "+_theTimestamper.format(new Date()));}modules = copyOfModules.elements();while (modules.hasMoreElements()){RecoveryModule m = (RecoveryModule) modules.nextElement();ClassLoader cl = switchClassLoader(m);try {m.periodicWorkSecondPass();} finally {restoreClassLoader(cl);}if (tsLogger.logger.isDebugEnabled()) {tsLogger.logger.debugf("PeriodicRecovery: recovery module '%s' second pass processed", m);}}
  • AtomicActionRecoveryModule: 进入 processTransactionsStatus(),最终会调用到 com.arjuna.ats.arjuna.recovery.RecoverAtomicAction.replayPhase2()。我们来看看这个方法。

//省略无关代码if ( (_theStatus == ActionStatus.PREPARED) ||(_theStatus == ActionStatus.COMMITTING) ||(_theStatus == ActionStatus.COMMITTED) ||(_theStatus == ActionStatus.H_COMMIT) ||(_theStatus == ActionStatus.H_MIXED) ||(_theStatus == ActionStatus.H_HAZARD) ){super.phase2Commit( _reportHeuristics ) ;}else if ( (_theStatus == ActionStatus.ABORTED) ||(_theStatus == ActionStatus.H_ROLLBACK) ||(_theStatus == ActionStatus.ABORTING) ||(_theStatus == ActionStatus.ABORT_ONLY) ){super.phase2Abort( _reportHeuristics ) ;}
  • 判断事务状态,如果是需要commit阶段的状态,进行commit,否则进行rollback

  • XARecoveryModule : 尝试在进行恢复。核心代码为

private void bottomUpRecovery() {for (XAResource xaResource : _resources) {try {xaRecoverySecondPass(xaResource);} catch (Exception ex) {jtaLogger.i18NLogger.warn_recovery_getxaresource(ex);}}// JBTM-895 garbage collection is now done when we return XAResources {@see XARecoveryModule#getNewXAResource(XAResourceRecord)}// JBTM-924 requires this here garbage collection, see JBTM-1155:if (_xidScans != null) {Set<XAResource> keys = new HashSet<XAResource>(_xidScans.keySet());for(XAResource theKey : keys) {RecoveryXids recoveryXids = _xidScans.get(theKey);if(recoveryXids.isStale()) {_xidScans.remove(theKey);}}}}

文章到此,已经写的很长很多了,我们分析了ShardingSphere对于XA方案,提供了一套SPI解决方案,对Atomikos进行了整合,也分析了Atomikos初始化流程,开始事务流程,获取连接流程,提交事务流程,回滚事务流程,事务恢复流程。
希望对大家理解XA的原理有所帮助。

关于我们

 Apache ShardingSphere是一套开源的分布式数据库中间件解决方案组成的生态圈,它由Sharding-JDBC、Sharding-Proxy和Sharding-Sidecar(规划中)这3款相互独立的产品组成。他们均提供标准化的数据分片、分布式事务、数据迁移、数据库治理和管控界面功能,可适用于如Java同构、异构语言、容器、云原生等各种多样化的应用场景。

Apache ShardingSphere不断践行Apache Way,致力于打造充满活力、规范、互助的社区!开源路上,我们欢迎你的加入。

项目地址:

https://github.com/apache/shardingsphere

更多信息请浏览官网

https://shardingsphere.apache.org/

作者介绍:肖宇,Apache ShardingSphere Committer,开源hmily分布式事务框架作者,
开源soul网关作者,热爱开源,追求写优雅代码。目前就职于京东数科,参与ShardingSphere的开源建设,以及分布式数据库的研发工作。

扫码关注

ShardingSphere

这篇关于Narayana-XA事务恢复(5)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/415702

相关文章

使用SecondaryNameNode恢复NameNode的数据

1)需求: NameNode进程挂了并且存储的数据也丢失了,如何恢复NameNode 此种方式恢复的数据可能存在小部分数据的丢失。 2)故障模拟 (1)kill -9 NameNode进程 [lytfly@hadoop102 current]$ kill -9 19886 (2)删除NameNode存储的数据(/opt/module/hadoop-3.1.4/data/tmp/dfs/na

电脑桌面文件删除了怎么找回来?别急,快速恢复攻略在此

在日常使用电脑的过程中,我们经常会遇到这样的情况:一不小心,桌面上的某个重要文件被删除了。这时,大多数人可能会感到惊慌失措,不知所措。 其实,不必过于担心,因为有很多方法可以帮助我们找回被删除的桌面文件。下面,就让我们一起来了解一下这些恢复桌面文件的方法吧。 一、使用撤销操作 如果我们刚刚删除了桌面上的文件,并且还没有进行其他操作,那么可以尝试使用撤销操作来恢复文件。在键盘上同时按下“C

MySql 事务练习

事务(transaction) -- 事务 transaction-- 事务是一组操作的集合,是一个不可分割的工作单位,事务会将所有的操作作为一个整体一起向系统提交或撤销请求-- 事务的操作要么同时成功,要么同时失败-- MySql的事务默认是自动提交的,当执行一个DML语句,MySql会立即自动隐式提交事务-- 常见案例:银行转账-- 逻辑:A给B转账1000:1.查询

Lua 脚本在 Redis 中执行时的原子性以及与redis的事务的区别

在 Redis 中,Lua 脚本具有原子性是因为 Redis 保证在执行脚本时,脚本中的所有操作都会被当作一个不可分割的整体。具体来说,Redis 使用单线程的执行模型来处理命令,因此当 Lua 脚本在 Redis 中执行时,不会有其他命令打断脚本的执行过程。脚本中的所有操作都将连续执行,直到脚本执行完成后,Redis 才会继续处理其他客户端的请求。 Lua 脚本在 Redis 中原子性的原因

如何恢复回收站中已删除/清空的文件

回收站清空后如何恢复已删除的文件?是否可以恢复永久删除的文件?或者最糟糕的是,如果文件直接被删除怎么办?本文将向您展示清空回收站后恢复已删除数据的最佳方法。 回收站清空后如何恢复已删除的文件? “回收站清空后我还能恢复已删除的文件吗?” 答案是肯定的,但是在这种情况下您将需要一个  回收站恢复工具 来从回收站中检索文件: 错误/永久删除回收站或任何数字存储设备中的文件 直接删除的文件/

海鸥相机存储卡格式化如何恢复数据

在摄影的世界里,‌每一张照片都承载着独特的记忆与故事。‌然而,‌当我们不慎将海鸥相机的存储卡格式化后,‌那些珍贵的瞬间似乎瞬间消逝,‌让人心急如焚。‌但请不要绝望,‌数据恢复并非遥不可及。‌本文将详细介绍在海鸥相机存储卡格式化后,‌如何高效地恢复丢失的数据,‌帮助您重新找回那些宝贵的记忆。‌ 图片来源于网络,如有侵权请告知 一、‌回忆备份情况 ‌海鸥相机存储卡格式化如何恢复数据?在意

想要从OPPO手机恢复数据?免费OPPO照片视频恢复软件

此实用程序可帮助那些寻找以下内容的用户: 在OPPO手机中格式化存储卡后可以恢复图片吗?我删除了 OPPO上的视频和图片,我感觉很糟糕,因为里面有我在拉斯维加斯拍摄的视频和照片 免费OPPO照片视频恢复软件 您能恢复OPPO上已删除的照片吗?我不小心格式化了OPPO SD 卡,有希望恢复已删除的照片吗? 救命!我在清理时删除了我的照片,我的问题是是否有任何免费软件可以从OPPO中恢复已

spring事务属性的xml格式配置

实际是使用代理做的事务优化 <!--配置事务的属性--><tx:advice id="txAdvice" transaction-manager="transactionManager"> <tx:attributes> <!--匹配所有以add开头的方法--><tx:method name="add*" propagation="REQUIRED" /> <tx:metho

Spring Cloud整合Seata实现分布式事务

文章目录 1.Seata1.1 官网1.2 下载1.3 通过安装包运行seata1.3.1 解压seata-server-1.3.0.zip1.3.2 修改 conf/file.conf 配置文件1.3.3 修改conf/registry.conf配置文件1.3.4 添加seata配置信息到nacos1.3.5 配置seata服务端数据库表结构1.3.6 启动seata 2.Spring

分布式 事务的几种实现方案

背景 四月初,去面试了本市的一家之前在做办公室无人货架的公司,虽然他们现在在面临着转型,但是对于我这种想从传统企业往互联网行业走的孩子来说,还是比较有吸引力的。 在面试过程中就提到了分布式事务问题。我又一次在没有好好整理的问题上吃了亏,记录一下,还是长记性 !!! 先看面试过程 面试官先是在纸上先画了这样一张图: 让我看这张图按照上面的流程走,有没有什么问题?面试官并没有直接说出来这里面