​Atomikos-XA 事务恢复(3)

2023-11-23 05:32
文章标签 恢复 事务 xa atomikos

本文主要是介绍​Atomikos-XA 事务恢复(3),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

Atomikos-XA 事务恢复(3)

Apache ShardingSphere 是一套开源的分布式数据库中间件解决方案组成的生态圈,它由 JDBC、Proxy 和 Sidecar(规划中)这 3 款相互独立,却又能够混合部署配合使用的产品组成。它们均提供标准化的数据分片、分布式事务和数据库治理功能,可适用于如 Java 同构、异构语言、云原生等各种多样化的应用场景。

ShardingSphere 已于2020年4月16日成为 Apache 软件基金会的顶级项目。

Atomikos-XA事务恢复

说事务恢复流程之前,我们来讨论下,会啥会出现事务恢复?XA二阶段提交协议不是强一致性的吗?要解答这个问题,我们就要来看看XA二阶段协议有什么问题?

问题一 :单点故障

由于协调者的重要性,一旦协调者TM发生故障。参与者RM会一直阻塞下去。尤其在第二阶段,协调者发生故障,那么所有的参与者还都处于锁定事务资源的状态中,而无法继续完成事务操作。(如果是协调者挂掉,可以重新选举一个协调者,但是无法解决因为协调者宕机导致的参与者处于阻塞状态的问题)

问题二 :数据不一致

数据不一致。在二阶段提交的阶段二中,当协调者向参与者发送commit请求之后,发生了局部网络异常或者在发送commit请求过程中协调者发生了故障,这回导致只有一部分参与者接受到了commit请求。而在这部分参与者接到commit请求之后就会执行commit操作。但是其他部分未接到commit请求的机器则无法执行事务提交。于是整个分布式系统便出现了数据不一致性的现象。

如何解决?

解决的方案简单,就是我们在事务的操作的每一步,我们都需要对事务状态的日志进行人为的记录,我们可以把日志记录存储在我们想存储的地方,可以是本地存储,也可以中心化的存储。atomikos的开源版本,我们之前也分析了,它是使用内存 + file的方式,存储在本地,这样的话,如果在一个集群系统里面,如果有节点宕机,日志又存储在本地,所以事务不能及时的恢复(需要重启服务)。

Atomikos 多场景下事务恢复。

Atomikos 提供了二种方式,来应对不同场景下的异常情况。

  • 场景一:服务节点不宕机,因为其他的原因,产生需要事务恢复的情况。这个时候才要定时任务进行恢复。
    具体的代码 com.atomikos.icatch.imp.TransactionServiceImp.init() 方法,会初始化一个定时任务,进行事务的恢复。

public synchronized void init ( Properties properties ) throws SysException{shutdownInProgress_ = false;control_ = new com.atomikos.icatch.admin.imp.LogControlImp ( (AdminLog) this.recoveryLog );ConfigProperties configProperties = new ConfigProperties(properties);long recoveryDelay = configProperties.getRecoveryDelay();recoveryTimer = new PooledAlarmTimer(recoveryDelay);recoveryTimer.addAlarmTimerListener(new AlarmTimerListener() {@Overridepublic void alarm(AlarmTimer timer) {//进行事务恢复performRecovery();}});TaskManager.SINGLETON.executeTask(recoveryTimer);initialized_ = true;}
  • 最终会进入com.atomikos.datasource.xa.XATransactionalResource.recover() 方法。

public void recover() {XaResourceRecoveryManager xaResourceRecoveryManager = XaResourceRecoveryManager.getInstance();if (xaResourceRecoveryManager != null) { //null for LogCloud recoverytry {xaResourceRecoveryManager.recover(getXAResource());} catch (Exception e) {refreshXAResource(); //cf case 156968}}}
  • 场景二: 当服务节点宕机重启动过程中进行事务的恢复。具体实现在com.atomikos.datasource.xa.XATransactionalResource.setRecoveryService()方法里面。

@Overridepublic void setRecoveryService ( RecoveryService recoveryService )throws ResourceException{if ( recoveryService != null ) {if ( LOGGER.isTraceEnabled() ) LOGGER.logTrace ( "Installing recovery service on resource "+ getName () );this.branchIdentifier=recoveryService.getName();//进行事务恢复recover();}
}
com.atomikos.datasource.xa.XATransactionalResource.recover() 流程详解。

主代码:

public void recover(XAResource xaResource) throws XAException {// 根据XA recovery 协议获取 xidList<XID> xidsToRecover = retrievePreparedXidsFromXaResource(xaResource);Collection<XID> xidsToCommit;try {// xid 与日志记录的xid进行匹配xidsToCommit = retrieveExpiredCommittingXidsFromLog();for (XID xid : xidsToRecover) {if (xidsToCommit.contains(xid)) {//执行 XA commit xid 进行提交replayCommit(xid, xaResource);} else {attemptPresumedAbort(xid, xaResource);}}} catch (LogException couldNotRetrieveCommittingXids) {LOGGER.logWarning("Transient error while recovering - will retry later...", couldNotRetrieveCommittingXids);}}
  • 我们来看一下如何根据 XA recovery 协议获取RM端存储的xid。进入方法 retrievePreparedXidsFromXaResource(xaResource), 最后进入 com.atomikos.datasource.xa.RecoveryScan.recoverXids()方法。

public static List<XID> recoverXids(XAResource xaResource, XidSelector selector) throws XAException {List<XID> ret = new ArrayList<XID>();boolean done = false;int flags = XAResource.TMSTARTRSCAN;Xid[] xidsFromLastScan = null;List<XID> allRecoveredXidsSoFar = new ArrayList<XID>();do {xidsFromLastScan = xaResource.recover(flags);flags = XAResource.TMNOFLAGS;done = (xidsFromLastScan == null || xidsFromLastScan.length == 0);if (!done) {// TEMPTATIVELY SET done TO TRUE// TO TOLERATE ORACLE 8.1.7 INFINITE// LOOP (ALWAYS RETURNS SAME RECOVER// SET). IF A NEW SET OF XIDS IS RETURNED// THEN done WILL BE RESET TO FALSEdone = true;for ( int i = 0; i < xidsFromLastScan.length; i++ ) {XID xid = new XID ( xidsFromLastScan[i] );// our own XID implements equals and hashCode properlyif (!allRecoveredXidsSoFar.contains(xid)) {// a new xid is returned -> we can not be in a recovery loop -> go onallRecoveredXidsSoFar.add(xid);done = false;if (selector.selects(xid)) {ret.add(xid);}}}}} while (!done);return ret;}
  • 我们重点关注xidsFromLastScan = xaResource.recover(flags); 这个方法,如果我们使用MySQL,那么久会进入 MysqlXAConnection.recover()方法。执行 XA recovery xid 语句来获取 xid。

protected static Xid[] recover(Connection c, int flag) throws XAException {        
/** The XA RECOVER statement returns information for those         XA transactions on the MySQL server that are in the PRE        PARED state. (See Section 13.4.7.2, ???XA* Transaction States???.) The output includes a row for e        ach such XA transaction on the server, regardless of wh        ich client started it.** XA RECOVER output rows look like this (for an example x        id value consisting of the parts 'abc', 'def', and 7):** mysql> XA RECOVER;* +----------+--------------+--------------+--------+* | formatID | gtrid_length | bqual_length | data |* +----------+--------------+--------------+--------+* | 7 | 3 | 3 | abcdef |* +----------+--------------+--------------+--------+** The output columns have the following meanings:** formatID is the formatID part of the transaction xid* gtrid_length is the length in bytes of the gtrid partof the xid* bqual_length is the length in bytes of the bqual partof the xid* data is the concatenation of the gtrid and bqual parts         of the xid*/boolean startRscan = ((flag & TMSTARTRSCAN) > 0);boolean endRscan = ((flag & TMENDRSCAN) > 0);if (!startRscan && !endRscan && flag != TMNOFLAGS) {throw new MysqlXAException(XAException.XAER_INVAL, Messages.getString("MysqlXAConnection.001"), null);
}
//
// We return all recovered XIDs at once, so if not  TMSTARTRSCAN, return no new XIDs
//
// We don't attempt to maintain state to check for TMNOFLAGS "outside" of a scan
//if (!startRscan) {return new Xid[0];}ResultSet rs = null;Statement stmt = null;List<MysqlXid> recoveredXidList = new ArrayList<MysqlXid>();try {// TODO: Cache this for lifetime of XAConnectionstmt = c.createStatement();rs = stmt.executeQuery("XA RECOVER");while (rs.next()) {final int formatId = rs.getInt(1);int gtridLength = rs.getInt(2);int bqualLength = rs.getInt(3);byte[] gtridAndBqual = rs.getBytes(4);final byte[] gtrid = new byte[gtridLength];final byte[] bqual = new byte[bqualLength];if (gtridAndBqual.length != (gtridLength + bqualLength)) {throw new MysqlXAException(XAException.XA_RBPROTO, Messages.getString("MysqlXAConnection.002"), null);
}System.arraycopy(gtridAndBqual, 0, gtrid, 0, gtridLength);System.arraycopy(gtridAndBqual, gtridLength, bqual, 0, bqualLength);recoveredXidList.add(new MysqlXid(gtrid, bqual, formatId));}} catch (SQLException sqlEx) {throw mapXAExceptionFromSQLException(sqlEx);} finally {if (rs != null) {try {rs.close();} catch (SQLException sqlEx) {throw mapXAExceptionFromSQLException(sqlEx);}}if (stmt != null) {try {stmt.close();} catch (SQLException sqlEx) {throw mapXAExceptionFromSQLException(sqlEx);}}}int numXids = recoveredXidList.size();Xid[] asXids = new Xid[numXids];Object[] asObjects = recoveredXidList.toArray();for (int i = 0; i < numXids; i++) {asXids[i] = (Xid) asObjects[i];}return asXids;}
  • 这里要注意如果Mysql的版本 <5.7.7 ,则不会有任何数据,在以后的版本中Mysql进行了修复,因此如果我们想要使用MySQL充当RM,版本必须 >= 5.7.7 ,原因是:

MySQL 5.6版本在客户端退出的时候,自动把已经prepare的事务回滚了,那么MySQL为什么要这样做?这主要取决于MySQL的内部实现,MySQL 5.7以前的版本,对于prepare的事务,MySQL是不会记录binlog的(官方说是减少fsync,起到了优化的作用)。只有当分布式事务提交的时候才会把前面的操作写入binlog信息,所以对于binlog来说,分布式事务与普通的事务没有区别,而prepare以前的操作信息都保存在连接的IO_CACHE中,如果这个时候客户端退出了,以前的binlog信息都会被丢失,再次重连后允许提交的话,会造成Binlog丢失,从而造成主从数据的不一致,所以官方在客户端退出的时候直接把已经prepare的事务都回滚了!

  • 回到主线再从自己记录的事务日志里面获取XID

Collection<XID> xidsToCommit = retrieveExpiredCommittingXidsFromLog();
我们来看下获取事务日志里面的XID的retrieveExpiredCommittingXidsFromLog()方法。然后进入com.atomikos.recovery.imp.RecoveryLogImp.getCommittingParticipants()方法。public Collection<ParticipantLogEntry> getCommittingParticipants()throws LogReadException {Collection<ParticipantLogEntry> committingParticipants = new HashSet<ParticipantLogEntry>();Collection<CoordinatorLogEntry> committingCoordinatorLogEntries = repository.findAllCommittingCoordinatorLogEntries();for (CoordinatorLogEntry coordinatorLogEntry : committingCoordinatorLogEntries) {for (ParticipantLogEntry participantLogEntry : coordinatorLogEntry.participants) {committingParticipants.add(participantLogEntry);}}return committingParticipants;}到这里我们来简单介绍一下,事务日志的存储结构。首先是
CoordinatorLogEntry,这是一次XA事务的所有信息实体类。
public class CoordinatorLogEntry implements Serializable {//全局事务idpublic final String id;//是否已经提交public final boolean wasCommitted;/*** Only for subtransactions, null otherwise.*/public final String superiorCoordinatorId;//参与者集合public final ParticipantLogEntry[] participants;}再来看一下参与者实体类 ParticipantLogEntry :
public class ParticipantLogEntry implements Serializable {private static final long serialVersionUID = 1728296701394899871L;/*** The ID of the global transaction as known by the transaction core.*/public final String coordinatorId;/*** Identifies the participant within the global transaction.*/public final String uri;/*** When does this participant expire (expressed in millis since Jan 1, 1970)?*/public final long expires;/*** Best-known state of the participant.*/public final TxState state;/*** For diagnostic purposes, null if not relevant.*/public final String resourceName;}回到com.atomikos.recovery.xa.DefaultXaRecoveryLog.getExpiredCommittingXids()
方法,可以到获取了一次XA事务过程中,存储的事务日志中的xid。
public Set<XID> getExpiredCommittingXids() throws LogReadException {Set<XID> ret = new HashSet<XID>();Collection<ParticipantLogEntry> entries = log.getCommittingParticipants();for (ParticipantLogEntry entry : entries) {if (expired(entry) && !http(entry)) {XID xid = new XID(entry.coordinatorId, entry.uri);ret.add(xid);}}return ret;}如果从RM中通过XA recovery取出的XID,包含在从事务日志中取出的XID,
则进行commit,否则进行rollback.
List<XID> xidsToRecover = retrievePreparedXidsFromXaResource(xaResource);Collection<XID> xidsToCommit;try {xidsToCommit = retrieveExpiredCommittingXidsFromLog();for (XID xid : xidsToRecover) {if (xidsToCommit.contains(xid)) {replayCommit(xid, xaResource);} else {attemptPresumedAbort(xid, xaResource);}}} catch (LogException couldNotRetrieveCommittingXids) {LOGGER.logWarning("Transient error while recovering - will retry later...", couldNotRetrieveCommittingXids);}replayCommit 方法如下:
private void replayCommit(XID xid, XAResource xaResource) {if (LOGGER.isDebugEnabled()) LOGGER.logDebug("Replaying commit of xid: " + xid);try {//进行事务提交xaResource.commit(xid, false);//更新事务日志log.terminated(xid);} catch (XAException e) {if (alreadyHeuristicallyTerminatedByResource(e)) {handleHeuristicTerminationByResource(xid, xaResource, e, true);} else if (xidTerminatedInResourceByConcurrentCommit(e)) {log.terminated(xid);} else {LOGGER.logWarning("Transient error while replaying commit - will retry later...", e);}}}
attemptPresumedAbort(xid, xaResource); 方法如下:
private void attemptPresumedAbort(XID xid, XAResource xaResource) {try {log.presumedAborting(xid);if (LOGGER.isDebugEnabled()) LOGGER.logDebug("Presumed abort of xid: " + xid);try {//进行回滚xaResource.rollback(xid);//更新日志状态log.terminated(xid);} catch (XAException e) {if (alreadyHeuristicallyTerminatedByResource(e)) {handleHeuristicTerminationByResource(xid, xaResource, e, false);} else if (xidTerminatedInResourceByConcurrentRollback(e)) {log.terminated(xid);} else {LOGGER.logWarning("Unexpected exception during recovery - ignoring to retry later...", e);}}} catch (IllegalStateException presumedAbortNotAllowedInCurrentLogState) {// ignore to retry later if necessary} catch (LogException logWriteException) {LOGGER.logWarning("log write failed for Xid: "+xid+", ignoring to retry later", logWriteException);}}

文章到此,已经写的很长很多了,我们分析了ShardingSphere对于XA方案,提供了一套SPI解决方案,对Atomikos进行了整合,也分析了Atomikos初始化流程,开始事务流程,获取连接流程,提交事务流程,回滚事务流程,事务恢复流程。
希望对大家理解XA的原理有所帮助。

关于我们

 Apache ShardingSphere是一套开源的分布式数据库中间件解决方案组成的生态圈,它由Sharding-JDBC、Sharding-Proxy和Sharding-Sidecar(规划中)这3款相互独立的产品组成。他们均提供标准化的数据分片、分布式事务、数据迁移、数据库治理和管控界面功能,可适用于如Java同构、异构语言、容器、云原生等各种多样化的应用场景。

Apache ShardingSphere不断践行Apache Way,致力于打造充满活力、规范、互助的社区!开源路上,我们欢迎你的加入。

项目地址:

https://github.com/apache/shardingsphere

更多信息请浏览官网

https://shardingsphere.apache.org/

作者介绍:肖宇,Apache ShardingSphere Committer,开源hmily分布式事务框架作者,
开源soul网关作者,热爱开源,追求写优雅代码。目前就职于京东数科,参与ShardingSphere的开源建设,以及分布式数据库的研发工作。

扫码关注

ShardingSphere

这篇关于​Atomikos-XA 事务恢复(3)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/415699

相关文章

使用SecondaryNameNode恢复NameNode的数据

1)需求: NameNode进程挂了并且存储的数据也丢失了,如何恢复NameNode 此种方式恢复的数据可能存在小部分数据的丢失。 2)故障模拟 (1)kill -9 NameNode进程 [lytfly@hadoop102 current]$ kill -9 19886 (2)删除NameNode存储的数据(/opt/module/hadoop-3.1.4/data/tmp/dfs/na

电脑桌面文件删除了怎么找回来?别急,快速恢复攻略在此

在日常使用电脑的过程中,我们经常会遇到这样的情况:一不小心,桌面上的某个重要文件被删除了。这时,大多数人可能会感到惊慌失措,不知所措。 其实,不必过于担心,因为有很多方法可以帮助我们找回被删除的桌面文件。下面,就让我们一起来了解一下这些恢复桌面文件的方法吧。 一、使用撤销操作 如果我们刚刚删除了桌面上的文件,并且还没有进行其他操作,那么可以尝试使用撤销操作来恢复文件。在键盘上同时按下“C

MySql 事务练习

事务(transaction) -- 事务 transaction-- 事务是一组操作的集合,是一个不可分割的工作单位,事务会将所有的操作作为一个整体一起向系统提交或撤销请求-- 事务的操作要么同时成功,要么同时失败-- MySql的事务默认是自动提交的,当执行一个DML语句,MySql会立即自动隐式提交事务-- 常见案例:银行转账-- 逻辑:A给B转账1000:1.查询

Lua 脚本在 Redis 中执行时的原子性以及与redis的事务的区别

在 Redis 中,Lua 脚本具有原子性是因为 Redis 保证在执行脚本时,脚本中的所有操作都会被当作一个不可分割的整体。具体来说,Redis 使用单线程的执行模型来处理命令,因此当 Lua 脚本在 Redis 中执行时,不会有其他命令打断脚本的执行过程。脚本中的所有操作都将连续执行,直到脚本执行完成后,Redis 才会继续处理其他客户端的请求。 Lua 脚本在 Redis 中原子性的原因

如何恢复回收站中已删除/清空的文件

回收站清空后如何恢复已删除的文件?是否可以恢复永久删除的文件?或者最糟糕的是,如果文件直接被删除怎么办?本文将向您展示清空回收站后恢复已删除数据的最佳方法。 回收站清空后如何恢复已删除的文件? “回收站清空后我还能恢复已删除的文件吗?” 答案是肯定的,但是在这种情况下您将需要一个  回收站恢复工具 来从回收站中检索文件: 错误/永久删除回收站或任何数字存储设备中的文件 直接删除的文件/

海鸥相机存储卡格式化如何恢复数据

在摄影的世界里,‌每一张照片都承载着独特的记忆与故事。‌然而,‌当我们不慎将海鸥相机的存储卡格式化后,‌那些珍贵的瞬间似乎瞬间消逝,‌让人心急如焚。‌但请不要绝望,‌数据恢复并非遥不可及。‌本文将详细介绍在海鸥相机存储卡格式化后,‌如何高效地恢复丢失的数据,‌帮助您重新找回那些宝贵的记忆。‌ 图片来源于网络,如有侵权请告知 一、‌回忆备份情况 ‌海鸥相机存储卡格式化如何恢复数据?在意

想要从OPPO手机恢复数据?免费OPPO照片视频恢复软件

此实用程序可帮助那些寻找以下内容的用户: 在OPPO手机中格式化存储卡后可以恢复图片吗?我删除了 OPPO上的视频和图片,我感觉很糟糕,因为里面有我在拉斯维加斯拍摄的视频和照片 免费OPPO照片视频恢复软件 您能恢复OPPO上已删除的照片吗?我不小心格式化了OPPO SD 卡,有希望恢复已删除的照片吗? 救命!我在清理时删除了我的照片,我的问题是是否有任何免费软件可以从OPPO中恢复已

spring事务属性的xml格式配置

实际是使用代理做的事务优化 <!--配置事务的属性--><tx:advice id="txAdvice" transaction-manager="transactionManager"> <tx:attributes> <!--匹配所有以add开头的方法--><tx:method name="add*" propagation="REQUIRED" /> <tx:metho

Spring Cloud整合Seata实现分布式事务

文章目录 1.Seata1.1 官网1.2 下载1.3 通过安装包运行seata1.3.1 解压seata-server-1.3.0.zip1.3.2 修改 conf/file.conf 配置文件1.3.3 修改conf/registry.conf配置文件1.3.4 添加seata配置信息到nacos1.3.5 配置seata服务端数据库表结构1.3.6 启动seata 2.Spring

分布式 事务的几种实现方案

背景 四月初,去面试了本市的一家之前在做办公室无人货架的公司,虽然他们现在在面临着转型,但是对于我这种想从传统企业往互联网行业走的孩子来说,还是比较有吸引力的。 在面试过程中就提到了分布式事务问题。我又一次在没有好好整理的问题上吃了亏,记录一下,还是长记性 !!! 先看面试过程 面试官先是在纸上先画了这样一张图: 让我看这张图按照上面的流程走,有没有什么问题?面试官并没有直接说出来这里面