使用Java开源组件Atomikos开发分布式事务应用

2024-04-24 14:08

本文主要是介绍使用Java开源组件Atomikos开发分布式事务应用,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

Atomikos是一个公司的名字,AtomikosTransactionsEssentials是其开源的分布式事务软件包,而ExtremeTransactions是商业的分布式事务软件包。TransactionsEssentials是基于apache-license的,是JTA/XA的开源实现,支持Java Application和J2EE应用。
下面以AtomikosTransactionsEssentials-3.4.2(可以在 http://www.atomikos.com下载)为例,说明其用法。
需要的jar包:jta.jar、transactions-essentials-all.jar。
Atomikos默认在classpath下使用名为transactions.properties的配置文件,如果找不到,则使用默认的配置参数。下面给一个transactions.properties的例子,可以根据自己的需要修改:
#SAMPLE PROPERTIES FILE FOR THE TRANSACTION SERVICE
#THIS FILE ILLUSTRATES THE DIFFERENT SETTINGS FOR THE TRANSACTION MANAGER
#UNCOMMENT THE ASSIGNMENTS TO OVERRIDE DEFAULT VALUES;
#Required: factory class name for the transaction service core.
#
com.atomikos.icatch.service=com.atomikos.icatch.standalone.UserTransactionServiceFactory
#
#Set name of file where messages are output
#
#com.atomikos.icatch.console_file_name = tm.out
#Size limit (in bytes) for the console file;
#negative means unlimited.
#
#com.atomikos.icatch.console_file_limit=-1
#For size-limited console files, this option
#specifies a number of rotating files to
#maintain.
#
#com.atomikos.icatch.console_file_count=1
#Set the number of log writes between checkpoints
#
#com.atomikos.icatch.checkpoint_interval=500
#Set output directory where console file and other files are to be put
#make sure this directory exists!
#
#com.atomikos.icatch.output_dir = ./
#Set directory of log files; make sure this directory exists!
#
#com.atomikos.icatch.log_base_dir = ./
#Set base name of log file
#this name will be used as the first part of
#the system-generated log file name
#
#com.atomikos.icatch.log_base_name = tmlog
#Set the max number of active local transactions
#or -1 for unlimited.
#
#com.atomikos.icatch.max_actives = 50
#Set the max timeout (in milliseconds) for local transactions
#
#com.atomikos.icatch.max_timeout = 300000
#The globally unique name of this transaction manager process
#override this value with a globally unique name
#
#com.atomikos.icatch.tm_unique_name = tm
#Do we want to use parallel subtransactions? JTA's default
#is NO for J2EE compatibility.
#
#com.atomikos.icatch.serial_jta_transactions=true
#If you want to do explicit resource registration then
#you need to set this value to false. See later in
#this manual for what explicit resource registration means.
#
#com.atomikos.icatch.automatic_resource_registration=true
#Set this to WARN, INFO or DEBUG to control the granularity
#of output to the console file.
#
#com.atomikos.icatch.console_log_level=WARN
#Do you want transaction logging to be enabled or not?
#If set to false, then no logging overhead will be done
#at the risk of losing data after restart or crash.
#
#com.atomikos.icatch.enable_logging=true
#Should two-phase commit be done in (multi-)threaded mode or not?
#
#com.atomikos.icatch.threaded_2pc=true
#Should exit of the VM force shutdown of the transaction core?
#
#com.atomikos.icatch.force_shutdown_on_vm_exit=false
#Should the logs be protected by a .lck file on startup?
#
#com.atomikos.icatch.lock_logs=true
Atomikos TransactionsEssentials支持3种使用方式,可以根据自己的情况选用,下面给出每种方式的使用场合和一个代码示例。
一、使用JDBC/JMS和UserTransaction,这是最直接和最简单的使用方式,使用Atomikos内置的JDBC、JMS适配器。示例如下:
package demo.atomikos;

import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Properties;

import javax.transaction.UserTransaction;

import com.atomikos.icatch.jta.UserTransactionImp;
import com.atomikos.jdbc.AtomikosDataSourceBean;

/**
*
*/
public class UserTransactionUtil {

public static UserTransaction getUserTransaction() {
UserTransaction utx = new UserTransactionImp();
return utx;
}

private static AtomikosDataSourceBean dsBean;

private static AtomikosDataSourceBean getDataSource() {
if (dsBean != null) return dsBean;
AtomikosDataSourceBean ds = new AtomikosDataSourceBean();
ds.setUniqueResourceName("db");
ds.setXaDataSourceClassName("oracle.jdbc.xa.client.OracleXADataSource");
Properties p = new Properties();
p.setProperty("user", "db_user_name" );
p.setProperty("password", "db_user_pwd");
p.setProperty("URL", "jdbc:oracle:thin:@192.168.0.10:1521:oradb");
ds.setXaProperties(p);
ds.setPoolSize(5);
dsBean = ds;
return dsBean;
}

public static Connection getDbConnection() throws SQLException{
Connection conn = getDataSource().getConnection();
return conn;
}

private static AtomikosDataSourceBean dsBean1;

private static AtomikosDataSourceBean getDataSource1() {
if (dsBean1 != null) return dsBean1;
AtomikosDataSourceBean ds = new AtomikosDataSourceBean();
ds.setUniqueResourceName("db1");
ds.setXaDataSourceClassName("oracle.jdbc.xa.client.OracleXADataSource");
Properties p = new Properties();
p.setProperty("user", "db_user_name" );
p.setProperty("password", "db_user_pwd");
p.setProperty("URL", "jdbc:oracle:thin:@192.168.0.11:1521:oradb1");
ds.setXaProperties(p);
ds.setPoolSize(5);
dsBean1 = ds;
return dsBean1;
}

public static Connection getDb1Connection() throws SQLException{
Connection conn = getDataSource1().getConnection();
return conn;
}

public static void main(String[] args) {
UserTransaction utx = getUserTransaction();
boolean rollback = false;
try {
//begin a transaction
utx.begin();

//execute db operation
Connection conn = null;
Connection conn1 = null;
Statement stmt = null;
Statement stmt1 = null;
try {
conn = getDbConnection();
conn1 = getDb1Connection();

stmt = conn.createStatement();
stmt.executeUpdate("insert into t values(1,'23')");

stmt1 = conn1.createStatement();
stmt1.executeUpdate("insert into t values(1,'123456789')");

}
catch(Exception e) {
throw e;
}
finally {
if (stmt != null) stmt.close();
if (conn != null) conn.close();
if (stmt1 != null) stmt1.close();
if (conn1 != null) conn1.close();
}
}
catch(Exception e) {
//an exception means we should not commit
rollback = true;
e.printStackTrace();
}
finally {
try {
//commit or rollback the transaction
if ( !rollback ) utx.commit();
else utx.rollback();
}
catch(Exception e) {
e.printStackTrace();
}
}
}
}
二、使用JTA TransactionManager。这种方式不需要Atomikos内置的JDBC、JMS适配器,但需要在JTA/XA级别上添加、删除XA资源实例。示例如下:
package demo.atomikos;

import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;

import javax.sql.XAConnection;
import javax.transaction.Transaction;
import javax.transaction.xa.XAResource;

import oracle.jdbc.xa.client.OracleXADataSource;

import com.atomikos.icatch.jta.UserTransactionManager;

public class TransactionManagerUtil {
public static UserTransactionManager getUserTransactionManager() throws Exception {
return new UserTransactionManager();
}

private static OracleXADataSource xads;

private static OracleXADataSource getXADataSource() throws SQLException{
if (xads != null) return xads;
xads = new OracleXADataSource();
xads.setUser ("db_user_name");
xads.setPassword("db_user_pwd");
xads.setURL("jdbc:oracle:thin:@192.168.0.10:1521:oradb");
return xads;
}

public static XAConnection getXAConnection() throws SQLException{
OracleXADataSource ds = getXADataSource();
return ds.getXAConnection();
}

private static OracleXADataSource xads1;

private static OracleXADataSource getXADataSource1() throws SQLException{
if (xads1 != null) return xads1;
xads1 = new OracleXADataSource();
xads1.setUser ("db_user_name");
xads1.setPassword("db_user_pwd");
xads1.setURL("jdbc:oracle:thin:@192.168.0.11:1521:oradb1");
return xads1;
}

public static XAConnection getXAConnection1() throws SQLException{
OracleXADataSource ds = getXADataSource1();
return ds.getXAConnection();
}

public static void main(String[] args) {
try {
UserTransactionManager tm = getUserTransactionManager();

XAConnection xaconn = getXAConnection();
XAConnection xaconn1 = getXAConnection1();

boolean rollback = false;
try {
//begin and retrieve tx
tm.begin();
Transaction tx = tm.getTransaction();

//get the XAResourc from the JDBC connection
XAResource xares = xaconn.getXAResource();
XAResource xares1 = xaconn1.getXAResource();

//enlist the resource with the transaction
//NOTE: this will only work if you set the configuration parameter:
//com.atomikos.icatch.automatic_resource_registration=true
//or, alternatively, if you use the UserTransactionService
//integration mode
tx.enlistResource(xares);
tx.enlistResource(xares1);

//access the database, the work will be
//subject to the outcome of the current transaction
Connection conn = xaconn.getConnection();
Statement stmt = conn.createStatement();
stmt.executeUpdate("insert into t values(1,'1234567')");
stmt.close();
conn.close();
Connection conn1 = xaconn1.getConnection();
Statement stmt1 = conn1.createStatement();
stmt1.executeUpdate("insert into t values(1,'abc1234567890')");
stmt1.close();
conn1.close();

//delist the resource
tx.delistResource(xares, XAResource.TMSUCCESS);
tx.delistResource(xares1, XAResource.TMSUCCESS);
}
catch ( Exception e ) {
//an exception means we should not commit
rollback = true;
throw e;
}
finally {
//ALWAYS terminate the tx
if (rollback) tm.rollback();
else tm.commit();

//only now close the connection
//i.e., not until AFTER commit or rollback!
xaconn.close();
xaconn1.close();
}
}
catch (Exception e) {
e.printStackTrace();
}
}
}
三、使用Atomikos UserTransactionService。这是高级使用方式,可以控制事务服务的启动和关闭,并且可以控制资源的装配。示例如下:
package demo.atomikos;

import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;

import javax.sql.XAConnection;
import javax.transaction.Transaction;
import javax.transaction.TransactionManager;
import javax.transaction.xa.XAResource;

import oracle.jdbc.xa.client.OracleXADataSource;

import com.atomikos.datasource.xa.jdbc.JdbcTransactionalResource;
import com.atomikos.icatch.config.TSInitInfo;
import com.atomikos.icatch.config.UserTransactionService;
import com.atomikos.icatch.config.UserTransactionServiceImp;

public class UserTransactionServiceUtil {
public static UserTransactionService getUserTransactionService() throws Exception {
return new UserTransactionServiceImp();
}

private static OracleXADataSource xads;

private static OracleXADataSource getXADataSource() throws SQLException{
if (xads != null) return xads;
xads = new OracleXADataSource();
xads.setUser ("db_user_name");
xads.setPassword("db_user_pwd");
xads.setURL("jdbc:oracle:thin:@192.168.0.10:1521:oradb");
return xads;
}

public static XAConnection getXAConnection() throws SQLException{
OracleXADataSource ds = getXADataSource();
return ds.getXAConnection();
}

private static JdbcTransactionalResource jdbcResource;

public static JdbcTransactionalResource getJdbcTransactionalResource() throws SQLException{
if (jdbcResource != null) return jdbcResource;
jdbcResource = new JdbcTransactionalResource (
"db"
,getXADataSource()
,new com.atomikos.datasource.xa.OraXidFactory() //oracle db need this
);
return jdbcResource;
}

private static OracleXADataSource xads1;

private static OracleXADataSource getXADataSource1() throws SQLException{
if (xads1 != null) return xads1;
xads1 = new OracleXADataSource();
xads1.setUser ("db_user_name");
xads1.setPassword("db_user_pwd");
xads1.setURL("jdbc:oracle:thin:@192.168.0.11:1521:oradb1");
return xads1;
}

public static XAConnection getXAConnection1() throws SQLException{
OracleXADataSource ds = getXADataSource1();
return ds.getXAConnection();
}

private static JdbcTransactionalResource jdbcResource1;

public static JdbcTransactionalResource getJdbcTransactionalResource1() throws SQLException{
if (jdbcResource1 != null) return jdbcResource1;
jdbcResource1 = new JdbcTransactionalResource (
"db1"
,getXADataSource1()
,new com.atomikos.datasource.xa.OraXidFactory() //oracle db need this
);
return jdbcResource1;
}

public static void main(String[] args) {
try {

//Register the resource with the transaction service
//this is done through the UserTransaction handle.
//All UserTransaction instances are equivalent and each
//one can be used to register a resource at any time.
UserTransactionService uts = getUserTransactionService();
uts.registerResource(getJdbcTransactionalResource());
uts.registerResource(getJdbcTransactionalResource1());

//Initialize the UserTransactionService.
//This will start the TM and recover
//all registered resources; you could
//call this 'eager recovery' (as opposed to 'lazy recovery'
//for the simple xa demo).
TSInitInfo info = uts.createTSInitInfo();
//optionally set config properties on info
info.setProperty("com.atomikos.icatch.checkpoint_interval", "2000");
uts.init(info);

TransactionManager tm = uts.getTransactionManager();
tm.setTransactionTimeout(60);

XAConnection xaconn = getXAConnection();
XAConnection xaconn1 = getXAConnection1();

boolean rollback = false;

//begin and retrieve tx
tm.begin();
Transaction tx = tm.getTransaction();

//get the XAResourc from the JDBC connection
XAResource xares = xaconn.getXAResource();
XAResource xares1 = xaconn1.getXAResource();

Connection conn = xaconn.getConnection();
Connection conn1 = xaconn1.getConnection();
try {

//enlist the resource with the transaction
//NOTE: this will only work if you set the configuration parameter:
//com.atomikos.icatch.automatic_resource_registration=true
//or, alternatively, if you use the UserTransactionService
//integration mode
tx.enlistResource(xares);
tx.enlistResource(xares1);

//access the database, the work will be
//subject to the outcome of the current transaction
Statement stmt = conn.createStatement();
stmt.executeUpdate("insert into t values(1,'1234567')");
stmt.close();

Statement stmt1 = conn1.createStatement();
stmt1.executeUpdate("insert into t values(1,'abc')");
stmt1.close();

}
catch ( Exception e ) {
//an exception means we should not commit
rollback = true;
throw e;
}
finally {
int flag = XAResource.TMSUCCESS;
if (rollback) flag = XAResource.TMFAIL;

tx.delistResource(xares, flag);
tx.delistResource(xares1, flag);

conn.close();
conn1.close();

if (!rollback) tm.commit();
else tm.rollback();
}

uts.shutdown(false);
}
catch (Exception e) {
e.printStackTrace();
}
}
}
在使用的时候,也可以把资源等配置到应用服务器中,使用JNDI获取资源。也可以与spring集成。

这篇关于使用Java开源组件Atomikos开发分布式事务应用的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/931974

相关文章

C++使用栈实现括号匹配的代码详解

《C++使用栈实现括号匹配的代码详解》在编程中,括号匹配是一个常见问题,尤其是在处理数学表达式、编译器解析等任务时,栈是一种非常适合处理此类问题的数据结构,能够精确地管理括号的匹配问题,本文将通过C+... 目录引言问题描述代码讲解代码解析栈的状态表示测试总结引言在编程中,括号匹配是一个常见问题,尤其是在

Java实现检查多个时间段是否有重合

《Java实现检查多个时间段是否有重合》这篇文章主要为大家详细介绍了如何使用Java实现检查多个时间段是否有重合,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录流程概述步骤详解China编程步骤1:定义时间段类步骤2:添加时间段步骤3:检查时间段是否有重合步骤4:输出结果示例代码结语作

Java中String字符串使用避坑指南

《Java中String字符串使用避坑指南》Java中的String字符串是我们日常编程中用得最多的类之一,看似简单的String使用,却隐藏着不少“坑”,如果不注意,可能会导致性能问题、意外的错误容... 目录8个避坑点如下:1. 字符串的不可变性:每次修改都创建新对象2. 使用 == 比较字符串,陷阱满

Java判断多个时间段是否重合的方法小结

《Java判断多个时间段是否重合的方法小结》这篇文章主要为大家详细介绍了Java中判断多个时间段是否重合的方法,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录判断多个时间段是否有间隔判断时间段集合是否与某时间段重合判断多个时间段是否有间隔实体类内容public class D

Python使用国内镜像加速pip安装的方法讲解

《Python使用国内镜像加速pip安装的方法讲解》在Python开发中,pip是一个非常重要的工具,用于安装和管理Python的第三方库,然而,在国内使用pip安装依赖时,往往会因为网络问题而导致速... 目录一、pip 工具简介1. 什么是 pip?2. 什么是 -i 参数?二、国内镜像源的选择三、如何

使用C++实现链表元素的反转

《使用C++实现链表元素的反转》反转链表是链表操作中一个经典的问题,也是面试中常见的考题,本文将从思路到实现一步步地讲解如何实现链表的反转,帮助初学者理解这一操作,我们将使用C++代码演示具体实现,同... 目录问题定义思路分析代码实现带头节点的链表代码讲解其他实现方式时间和空间复杂度分析总结问题定义给定

IDEA编译报错“java: 常量字符串过长”的原因及解决方法

《IDEA编译报错“java:常量字符串过长”的原因及解决方法》今天在开发过程中,由于尝试将一个文件的Base64字符串设置为常量,结果导致IDEA编译的时候出现了如下报错java:常量字符串过长,... 目录一、问题描述二、问题原因2.1 理论角度2.2 源码角度三、解决方案解决方案①:StringBui

Linux使用nload监控网络流量的方法

《Linux使用nload监控网络流量的方法》Linux中的nload命令是一个用于实时监控网络流量的工具,它提供了传入和传出流量的可视化表示,帮助用户一目了然地了解网络活动,本文给大家介绍了Linu... 目录简介安装示例用法基础用法指定网络接口限制显示特定流量类型指定刷新率设置流量速率的显示单位监控多个

Java覆盖第三方jar包中的某一个类的实现方法

《Java覆盖第三方jar包中的某一个类的实现方法》在我们日常的开发中,经常需要使用第三方的jar包,有时候我们会发现第三方的jar包中的某一个类有问题,或者我们需要定制化修改其中的逻辑,那么应该如何... 目录一、需求描述二、示例描述三、操作步骤四、验证结果五、实现原理一、需求描述需求描述如下:需要在

Java中ArrayList和LinkedList有什么区别举例详解

《Java中ArrayList和LinkedList有什么区别举例详解》:本文主要介绍Java中ArrayList和LinkedList区别的相关资料,包括数据结构特性、核心操作性能、内存与GC影... 目录一、底层数据结构二、核心操作性能对比三、内存与 GC 影响四、扩容机制五、线程安全与并发方案六、工程