Tx-lcn分布式事务框架初体验

2023-12-24 10:38

本文主要是介绍Tx-lcn分布式事务框架初体验,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

Tx-lcn分布式事务框架初体验

  • 架构学习
    • 底层通讯
      • 客户端注册
      • 事务消息通知
  • 事务开启注解类型
  • 事务锁底层实现

架构学习

TX-LCN 由两大模块组成,TxClient、TxManager
TxManager 独立服务部署
TxClient 作为模块的依赖框架,提供了 TX-LCN 的标准支持,事务发起方和参与方都属于 TxClient
在这里插入图片描述

底层通讯

客户端注册

http协议进行客户端注册并获取netty通信地址

所属类:com.codingapi.tx.config.ConfigReaderpublic String getTxUrl() {try {txManagerTxUrlService =  spring.getBean(TxManagerTxUrlService.class);}catch (Exception e){logger.debug("load default txManagerTxUrlService ");}if(txManagerTxUrlService == null){txManagerTxUrlService = new TxManagerTxUrlService() {private final String configName = "tx.properties";private final String configKey = "url";@Overridepublic String getTxUrl() {return ConfigUtils.getString(configName,configKey);}};logger.debug("load default txManagerTxUrlService");}else{logger.debug("load txManagerTxUrlService");}return txManagerTxUrlService.getTxUrl();//这个类需要自己根据业务实现}
所属类:com.codingapi.tx.netty.service.impl.MQTxManagerServiceImpl
@Override
public String httpGetServer() {String url = configReader.getTxUrl() + "getServer";return managerHelper.httpGet(url);
}
所属类:com.codingapi.tx.netty.service.impl.MQTxManagerServiceImpl
@Override
public String httpGetServer() {String url = configReader.getTxUrl() + "getServer";return managerHelper.httpGet(url);
}

事务消息通知

底层消息事务通知使用netty-tcp进行通信

所属类:com.codingapi.tx.netty.service.impl.NettyDistributeServiceImplprivate void getTxServer() {//获取负载均衡服务地址String json = null;while (StringUtils.isEmpty(json)) {json = txManagerService.httpGetServer();logger.info("get txManager ->" + json);if (StringUtils.isEmpty(json)) {logger.error("TxManager服务器无法访问.");try {Thread.sleep(1000 * 2);} catch (InterruptedException e) {e.printStackTrace();}}}TxServer txServer = TxServer.parser(json);if (txServer != null) {logger.debug("txServer -> " + txServer);logger.info(txServer.toString());Constants.txServer = txServer;logger.info(Constants.txServer.toString());connectCont = 0;}}
所属类:com.codingapi.tx.netty.service.impl.NettyServiceImpl@Overridepublic synchronized void start() {if (isStarting) {return;}isStarting = true;nettyDistributeService.loadTxServer();String host = Constants.txServer.getHost();int port = Constants.txServer.getPort();final int heart = Constants.txServer.getHeart();int delay = Constants.txServer.getDelay();final TransactionHandler transactionHandler = new TransactionHandler(threadPool,nettyControlService, delay);workerGroup = new NioEventLoopGroup();try {Bootstrap b = new Bootstrap(); // (1)b.group(workerGroup); // (2)b.channel(NioSocketChannel.class); // (3)b.option(ChannelOption.SO_KEEPALIVE, true); // (4)b.handler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast("timeout", new IdleStateHandler(heart, heart, heart, TimeUnit.SECONDS));ch.pipeline().addLast(new LengthFieldPrepender(4, false));ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));ch.pipeline().addLast(transactionHandler);}});// Start the client.logger.info("connection txManager-socket-> host:" + host + ",port:" + port);ChannelFuture future = b.connect(host, port); // (5)future.addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture channelFuture) throws Exception {if (!channelFuture.isSuccess()) {channelFuture.channel().eventLoop().schedule(new Runnable() {@Overridepublic void run() {isStarting = false;start();}}, 5, TimeUnit.SECONDS);}}});} catch (Exception e) {logger.error(e.getLocalizedMessage());}}

事务开启注解类型

@TxTransaction(isStart = false/true)
1.使用切面来进行事务控制

public class AspectBeforeServiceImpl implements AspectBeforeService {@Autowiredprivate TransactionServerFactoryService transactionServerFactoryService;private Logger logger = LoggerFactory.getLogger(AspectBeforeServiceImpl.class);public Object around(String groupId, ProceedingJoinPoint point) throws Throwable {MethodSignature signature = (MethodSignature) point.getSignature();Method method = signature.getMethod();Class<?> clazz = point.getTarget().getClass();Object[] args = point.getArgs();Method thisMethod = clazz.getMethod(method.getName(), method.getParameterTypes());TxTransaction transaction = thisMethod.getAnnotation(TxTransaction.class);TxTransactionLocal txTransactionLocal = TxTransactionLocal.current();logger.debug("around--> groupId-> " +groupId+",txTransactionLocal->"+txTransactionLocal);TransactionInvocation invocation = new TransactionInvocation(clazz, thisMethod.getName(), thisMethod.toString(), args, method.getParameterTypes());TxTransactionInfo info = new TxTransactionInfo(transaction,txTransactionLocal,invocation,groupId);TransactionServer server = transactionServerFactoryService.createTransactionServer(info);return server.execute(point, info);//事务控制}
}

2.事务控制类包含三个实现
事务控制接口类:com.codingapi.tx.aop.service.TransactionServer
实现:
在这里插入图片描述

事务锁底层实现

事务锁任务task中使用lock 和 condition 条件锁,主要是通过条件锁等待进行阻塞,收到消息后通知解锁来进行事务控制,事务的最终控制是在收到回滚消息后抛出异常实现:

收到netty消息:

消息业务处理实现类:com.codingapi.tx.netty.service.impl.NettyControlServiceImpl@Overridepublic void executeService(final ChannelHandlerContext ctx,final String json) {if (StringUtils.isNotEmpty(json)) {JSONObject resObj = JSONObject.parseObject(json);if (resObj.containsKey("a")) {// tm发送数据给tx模块的处理指令transactionControlService.notifyTransactionMsg(ctx,resObj,json);}else{//tx发送数据给tm的响应返回数据String key = resObj.getString("k");responseMsg(key,resObj);}}}
------------------>通知消息@Overridepublic void notifyTransactionMsg(ChannelHandlerContext ctx,JSONObject resObj, String json) {String action = resObj.getString("a");String key = resObj.getString("k");IActionService actionService = spring.getBean(action, IActionService.class);String res = actionService.execute(resObj, json);JSONObject data = new JSONObject();data.put("k", key);data.put("a", action);JSONObject params = new JSONObject();params.put("d", res);data.put("p", params);SocketUtils.sendMsg(ctx, data.toString());logger.debug("send notify data ->" + data.toString());}
----------------------->通知更新任务状态,并解锁@Overridepublic String execute(JSONObject resObj, String json) {String res;//通知提醒final int state = resObj.getInteger("c");String taskId = resObj.getString("t");if(transactionControl.executeTransactionOperation()) {TaskGroup task = TaskGroupManager.getInstance().getTaskGroup(taskId);logger.info("accept notify data ->" + json);if (task != null) {if (task.isAwait()) {   //已经等待res = notifyWaitTask(task, state);} else {int index = 0;while (true) {if (index > 500) {res = "0";break;}if (task.isAwait()) {   //已经等待res = notifyWaitTask(task, state);break;}index++;try {Thread.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}}}} else {res = "0";}}else{//非事务操作res = "1";transactionControl.autoNoTransactionOperation();}logger.info("accept notify response res ->" + res);return res;}---------------------->更新task状态,并解锁private String notifyWaitTask(TaskGroup task, int state) {String res;task.setState(state);task.signalTask();int count = 0;while (true) {if (task.isRemove()) {if (task.getState() == TaskState.rollback.getCode()|| task.getState() == TaskState.commit.getCode()) {res = "1";} else {res = "0";}break;}if (count > 1000) {//已经通知了,有可能失败.res = "2";break;}count++;try {Thread.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}}return res;}------------------>事务切面处理收到解锁消息,抛出异常@Overridepublic Object execute(final ProceedingJoinPoint point, final TxTransactionInfo info) throws Throwable {String kid = KidUtils.generateShortUuid();String txGroupId = info.getTxGroupId();logger.debug("--->begin running transaction,groupId:" + txGroupId);long t1 = System.currentTimeMillis();boolean isHasIsGroup =  transactionControl.hasGroup(txGroupId);TxTransactionLocal txTransactionLocal = new TxTransactionLocal();txTransactionLocal.setGroupId(txGroupId);txTransactionLocal.setHasStart(false);txTransactionLocal.setKid(kid);txTransactionLocal.setHasIsGroup(isHasIsGroup);txTransactionLocal.setMaxTimeOut(Constants.txServer.getCompensateMaxWaitTime());TxTransactionLocal.setCurrent(txTransactionLocal);try {Object res = point.proceed();//写操作 处理if(!txTransactionLocal.isReadOnly()) {String methodStr = info.getInvocation().getMethodStr();TxGroup resTxGroup = txManagerService.addTransactionGroup(txGroupId, kid, isHasIsGroup, methodStr);//已经进入过该模块的,不再执行此方法if(!isHasIsGroup) {String type = txTransactionLocal.getType();TxTask waitTask = TaskGroupManager.getInstance().getTask(kid, type);//lcn 连接已经开始等待时.//等待锁释放while (waitTask != null && !waitTask.isAwait()) {TimeUnit.MILLISECONDS.sleep(1);}if (resTxGroup == null) {//通知业务回滚事务if (waitTask != null) {//修改事务组状态异常waitTask.setState(-1);waitTask.signalTask();//抛出异常消息回滚throw new ServiceException("update TxGroup error, groupId:" + txGroupId);}}}}return res;} catch (Throwable e) {throw e;} finally {TxTransactionLocal.setCurrent(null);long t2 = System.currentTimeMillis();logger.debug("<---end running transaction,groupId:" + txGroupId+",execute time:"+(t2-t1));}}

这篇关于Tx-lcn分布式事务框架初体验的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/531502

相关文章

java如何分布式锁实现和选型

《java如何分布式锁实现和选型》文章介绍了分布式锁的重要性以及在分布式系统中常见的问题和需求,它详细阐述了如何使用分布式锁来确保数据的一致性和系统的高可用性,文章还提供了基于数据库、Redis和Zo... 目录引言:分布式锁的重要性与分布式系统中的常见问题和需求分布式锁的重要性分布式系统中常见的问题和需求

Redis事务与数据持久化方式

《Redis事务与数据持久化方式》该文档主要介绍了Redis事务和持久化机制,事务通过将多个命令打包执行,而持久化则通过快照(RDB)和追加式文件(AOF)两种方式将内存数据保存到磁盘,以防止数据丢失... 目录一、Redis 事务1.1 事务本质1.2 数据库事务与redis事务1.2.1 数据库事务1.

Golang使用etcd构建分布式锁的示例分享

《Golang使用etcd构建分布式锁的示例分享》在本教程中,我们将学习如何使用Go和etcd构建分布式锁系统,分布式锁系统对于管理对分布式系统中共享资源的并发访问至关重要,它有助于维护一致性,防止竞... 目录引言环境准备新建Go项目实现加锁和解锁功能测试分布式锁重构实现失败重试总结引言我们将使用Go作

Redis分布式锁使用及说明

《Redis分布式锁使用及说明》本文总结了Redis和Zookeeper在高可用性和高一致性场景下的应用,并详细介绍了Redis的分布式锁实现方式,包括使用Lua脚本和续期机制,最后,提到了RedLo... 目录Redis分布式锁加锁方式怎么会解错锁?举个小案例吧解锁方式续期总结Redis分布式锁如果追求

SpringBoot嵌套事务详解及失效解决方案

《SpringBoot嵌套事务详解及失效解决方案》在复杂的业务场景中,嵌套事务可以帮助我们更加精细地控制数据的一致性,然而,在SpringBoot中,如果嵌套事务的配置不当,可能会导致事务不生效的问题... 目录什么是嵌套事务?嵌套事务失效的原因核心问题:嵌套事务的解决方案方案一:将嵌套事务方法提取到独立类

MyBatis框架实现一个简单的数据查询操作

《MyBatis框架实现一个简单的数据查询操作》本文介绍了MyBatis框架下进行数据查询操作的详细步骤,括创建实体类、编写SQL标签、配置Mapper、开启驼峰命名映射以及执行SQL语句等,感兴趣的... 基于在前面几章我们已经学习了对MyBATis进行环境配置,并利用SqlSessionFactory核

cross-plateform 跨平台应用程序-03-如果只选择一个框架,应该选择哪一个?

跨平台系列 cross-plateform 跨平台应用程序-01-概览 cross-plateform 跨平台应用程序-02-有哪些主流技术栈? cross-plateform 跨平台应用程序-03-如果只选择一个框架,应该选择哪一个? cross-plateform 跨平台应用程序-04-React Native 介绍 cross-plateform 跨平台应用程序-05-Flutte

Spring框架5 - 容器的扩展功能 (ApplicationContext)

private static ApplicationContext applicationContext;static {applicationContext = new ClassPathXmlApplicationContext("bean.xml");} BeanFactory的功能扩展类ApplicationContext进行深度的分析。ApplicationConext与 BeanF

数据治理框架-ISO数据治理标准

引言 "数据治理"并不是一个新的概念,国内外有很多组织专注于数据治理理论和实践的研究。目前国际上,主要的数据治理框架有ISO数据治理标准、GDI数据治理框架、DAMA数据治理管理框架等。 ISO数据治理标准 改标准阐述了数据治理的标准、基本原则和数据治理模型,是一套完整的数据治理方法论。 ISO/IEC 38505标准的数据治理方法论的核心内容如下: 数据治理的目标:促进组织高效、合理地

ZooKeeper 中的 Curator 框架解析

Apache ZooKeeper 是一个为分布式应用提供一致性服务的软件。它提供了诸如配置管理、分布式同步、组服务等功能。在使用 ZooKeeper 时,Curator 是一个非常流行的客户端库,它简化了 ZooKeeper 的使用,提供了高级的抽象和丰富的工具。本文将详细介绍 Curator 框架,包括它的设计哲学、核心组件以及如何使用 Curator 来简化 ZooKeeper 的操作。 1