Tx-lcn分布式事务框架初体验

2023-12-24 10:38

本文主要是介绍Tx-lcn分布式事务框架初体验,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

Tx-lcn分布式事务框架初体验

  • 架构学习
    • 底层通讯
      • 客户端注册
      • 事务消息通知
  • 事务开启注解类型
  • 事务锁底层实现

架构学习

TX-LCN 由两大模块组成,TxClient、TxManager
TxManager 独立服务部署
TxClient 作为模块的依赖框架,提供了 TX-LCN 的标准支持,事务发起方和参与方都属于 TxClient
在这里插入图片描述

底层通讯

客户端注册

http协议进行客户端注册并获取netty通信地址

所属类:com.codingapi.tx.config.ConfigReaderpublic String getTxUrl() {try {txManagerTxUrlService =  spring.getBean(TxManagerTxUrlService.class);}catch (Exception e){logger.debug("load default txManagerTxUrlService ");}if(txManagerTxUrlService == null){txManagerTxUrlService = new TxManagerTxUrlService() {private final String configName = "tx.properties";private final String configKey = "url";@Overridepublic String getTxUrl() {return ConfigUtils.getString(configName,configKey);}};logger.debug("load default txManagerTxUrlService");}else{logger.debug("load txManagerTxUrlService");}return txManagerTxUrlService.getTxUrl();//这个类需要自己根据业务实现}
所属类:com.codingapi.tx.netty.service.impl.MQTxManagerServiceImpl
@Override
public String httpGetServer() {String url = configReader.getTxUrl() + "getServer";return managerHelper.httpGet(url);
}
所属类:com.codingapi.tx.netty.service.impl.MQTxManagerServiceImpl
@Override
public String httpGetServer() {String url = configReader.getTxUrl() + "getServer";return managerHelper.httpGet(url);
}

事务消息通知

底层消息事务通知使用netty-tcp进行通信

所属类:com.codingapi.tx.netty.service.impl.NettyDistributeServiceImplprivate void getTxServer() {//获取负载均衡服务地址String json = null;while (StringUtils.isEmpty(json)) {json = txManagerService.httpGetServer();logger.info("get txManager ->" + json);if (StringUtils.isEmpty(json)) {logger.error("TxManager服务器无法访问.");try {Thread.sleep(1000 * 2);} catch (InterruptedException e) {e.printStackTrace();}}}TxServer txServer = TxServer.parser(json);if (txServer != null) {logger.debug("txServer -> " + txServer);logger.info(txServer.toString());Constants.txServer = txServer;logger.info(Constants.txServer.toString());connectCont = 0;}}
所属类:com.codingapi.tx.netty.service.impl.NettyServiceImpl@Overridepublic synchronized void start() {if (isStarting) {return;}isStarting = true;nettyDistributeService.loadTxServer();String host = Constants.txServer.getHost();int port = Constants.txServer.getPort();final int heart = Constants.txServer.getHeart();int delay = Constants.txServer.getDelay();final TransactionHandler transactionHandler = new TransactionHandler(threadPool,nettyControlService, delay);workerGroup = new NioEventLoopGroup();try {Bootstrap b = new Bootstrap(); // (1)b.group(workerGroup); // (2)b.channel(NioSocketChannel.class); // (3)b.option(ChannelOption.SO_KEEPALIVE, true); // (4)b.handler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast("timeout", new IdleStateHandler(heart, heart, heart, TimeUnit.SECONDS));ch.pipeline().addLast(new LengthFieldPrepender(4, false));ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));ch.pipeline().addLast(transactionHandler);}});// Start the client.logger.info("connection txManager-socket-> host:" + host + ",port:" + port);ChannelFuture future = b.connect(host, port); // (5)future.addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture channelFuture) throws Exception {if (!channelFuture.isSuccess()) {channelFuture.channel().eventLoop().schedule(new Runnable() {@Overridepublic void run() {isStarting = false;start();}}, 5, TimeUnit.SECONDS);}}});} catch (Exception e) {logger.error(e.getLocalizedMessage());}}

事务开启注解类型

@TxTransaction(isStart = false/true)
1.使用切面来进行事务控制

public class AspectBeforeServiceImpl implements AspectBeforeService {@Autowiredprivate TransactionServerFactoryService transactionServerFactoryService;private Logger logger = LoggerFactory.getLogger(AspectBeforeServiceImpl.class);public Object around(String groupId, ProceedingJoinPoint point) throws Throwable {MethodSignature signature = (MethodSignature) point.getSignature();Method method = signature.getMethod();Class<?> clazz = point.getTarget().getClass();Object[] args = point.getArgs();Method thisMethod = clazz.getMethod(method.getName(), method.getParameterTypes());TxTransaction transaction = thisMethod.getAnnotation(TxTransaction.class);TxTransactionLocal txTransactionLocal = TxTransactionLocal.current();logger.debug("around--> groupId-> " +groupId+",txTransactionLocal->"+txTransactionLocal);TransactionInvocation invocation = new TransactionInvocation(clazz, thisMethod.getName(), thisMethod.toString(), args, method.getParameterTypes());TxTransactionInfo info = new TxTransactionInfo(transaction,txTransactionLocal,invocation,groupId);TransactionServer server = transactionServerFactoryService.createTransactionServer(info);return server.execute(point, info);//事务控制}
}

2.事务控制类包含三个实现
事务控制接口类:com.codingapi.tx.aop.service.TransactionServer
实现:
在这里插入图片描述

事务锁底层实现

事务锁任务task中使用lock 和 condition 条件锁,主要是通过条件锁等待进行阻塞,收到消息后通知解锁来进行事务控制,事务的最终控制是在收到回滚消息后抛出异常实现:

收到netty消息:

消息业务处理实现类:com.codingapi.tx.netty.service.impl.NettyControlServiceImpl@Overridepublic void executeService(final ChannelHandlerContext ctx,final String json) {if (StringUtils.isNotEmpty(json)) {JSONObject resObj = JSONObject.parseObject(json);if (resObj.containsKey("a")) {// tm发送数据给tx模块的处理指令transactionControlService.notifyTransactionMsg(ctx,resObj,json);}else{//tx发送数据给tm的响应返回数据String key = resObj.getString("k");responseMsg(key,resObj);}}}
------------------>通知消息@Overridepublic void notifyTransactionMsg(ChannelHandlerContext ctx,JSONObject resObj, String json) {String action = resObj.getString("a");String key = resObj.getString("k");IActionService actionService = spring.getBean(action, IActionService.class);String res = actionService.execute(resObj, json);JSONObject data = new JSONObject();data.put("k", key);data.put("a", action);JSONObject params = new JSONObject();params.put("d", res);data.put("p", params);SocketUtils.sendMsg(ctx, data.toString());logger.debug("send notify data ->" + data.toString());}
----------------------->通知更新任务状态,并解锁@Overridepublic String execute(JSONObject resObj, String json) {String res;//通知提醒final int state = resObj.getInteger("c");String taskId = resObj.getString("t");if(transactionControl.executeTransactionOperation()) {TaskGroup task = TaskGroupManager.getInstance().getTaskGroup(taskId);logger.info("accept notify data ->" + json);if (task != null) {if (task.isAwait()) {   //已经等待res = notifyWaitTask(task, state);} else {int index = 0;while (true) {if (index > 500) {res = "0";break;}if (task.isAwait()) {   //已经等待res = notifyWaitTask(task, state);break;}index++;try {Thread.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}}}} else {res = "0";}}else{//非事务操作res = "1";transactionControl.autoNoTransactionOperation();}logger.info("accept notify response res ->" + res);return res;}---------------------->更新task状态,并解锁private String notifyWaitTask(TaskGroup task, int state) {String res;task.setState(state);task.signalTask();int count = 0;while (true) {if (task.isRemove()) {if (task.getState() == TaskState.rollback.getCode()|| task.getState() == TaskState.commit.getCode()) {res = "1";} else {res = "0";}break;}if (count > 1000) {//已经通知了,有可能失败.res = "2";break;}count++;try {Thread.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}}return res;}------------------>事务切面处理收到解锁消息,抛出异常@Overridepublic Object execute(final ProceedingJoinPoint point, final TxTransactionInfo info) throws Throwable {String kid = KidUtils.generateShortUuid();String txGroupId = info.getTxGroupId();logger.debug("--->begin running transaction,groupId:" + txGroupId);long t1 = System.currentTimeMillis();boolean isHasIsGroup =  transactionControl.hasGroup(txGroupId);TxTransactionLocal txTransactionLocal = new TxTransactionLocal();txTransactionLocal.setGroupId(txGroupId);txTransactionLocal.setHasStart(false);txTransactionLocal.setKid(kid);txTransactionLocal.setHasIsGroup(isHasIsGroup);txTransactionLocal.setMaxTimeOut(Constants.txServer.getCompensateMaxWaitTime());TxTransactionLocal.setCurrent(txTransactionLocal);try {Object res = point.proceed();//写操作 处理if(!txTransactionLocal.isReadOnly()) {String methodStr = info.getInvocation().getMethodStr();TxGroup resTxGroup = txManagerService.addTransactionGroup(txGroupId, kid, isHasIsGroup, methodStr);//已经进入过该模块的,不再执行此方法if(!isHasIsGroup) {String type = txTransactionLocal.getType();TxTask waitTask = TaskGroupManager.getInstance().getTask(kid, type);//lcn 连接已经开始等待时.//等待锁释放while (waitTask != null && !waitTask.isAwait()) {TimeUnit.MILLISECONDS.sleep(1);}if (resTxGroup == null) {//通知业务回滚事务if (waitTask != null) {//修改事务组状态异常waitTask.setState(-1);waitTask.signalTask();//抛出异常消息回滚throw new ServiceException("update TxGroup error, groupId:" + txGroupId);}}}}return res;} catch (Throwable e) {throw e;} finally {TxTransactionLocal.setCurrent(null);long t2 = System.currentTimeMillis();logger.debug("<---end running transaction,groupId:" + txGroupId+",execute time:"+(t2-t1));}}

这篇关于Tx-lcn分布式事务框架初体验的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/531502

相关文章

cross-plateform 跨平台应用程序-03-如果只选择一个框架,应该选择哪一个?

跨平台系列 cross-plateform 跨平台应用程序-01-概览 cross-plateform 跨平台应用程序-02-有哪些主流技术栈? cross-plateform 跨平台应用程序-03-如果只选择一个框架,应该选择哪一个? cross-plateform 跨平台应用程序-04-React Native 介绍 cross-plateform 跨平台应用程序-05-Flutte

Spring框架5 - 容器的扩展功能 (ApplicationContext)

private static ApplicationContext applicationContext;static {applicationContext = new ClassPathXmlApplicationContext("bean.xml");} BeanFactory的功能扩展类ApplicationContext进行深度的分析。ApplicationConext与 BeanF

数据治理框架-ISO数据治理标准

引言 "数据治理"并不是一个新的概念,国内外有很多组织专注于数据治理理论和实践的研究。目前国际上,主要的数据治理框架有ISO数据治理标准、GDI数据治理框架、DAMA数据治理管理框架等。 ISO数据治理标准 改标准阐述了数据治理的标准、基本原则和数据治理模型,是一套完整的数据治理方法论。 ISO/IEC 38505标准的数据治理方法论的核心内容如下: 数据治理的目标:促进组织高效、合理地

ZooKeeper 中的 Curator 框架解析

Apache ZooKeeper 是一个为分布式应用提供一致性服务的软件。它提供了诸如配置管理、分布式同步、组服务等功能。在使用 ZooKeeper 时,Curator 是一个非常流行的客户端库,它简化了 ZooKeeper 的使用,提供了高级的抽象和丰富的工具。本文将详细介绍 Curator 框架,包括它的设计哲学、核心组件以及如何使用 Curator 来简化 ZooKeeper 的操作。 1

集中式版本控制与分布式版本控制——Git 学习笔记01

什么是版本控制 如果你用 Microsoft Word 写过东西,那你八成会有这样的经历: 想删除一段文字,又怕将来这段文字有用,怎么办呢?有一个办法,先把当前文件“另存为”一个文件,然后继续改,改到某个程度,再“另存为”一个文件。就这样改着、存着……最后你的 Word 文档变成了这样: 过了几天,你想找回被删除的文字,但是已经记不清保存在哪个文件了,只能挨个去找。真麻烦,眼睛都花了。看

【Kubernetes】K8s 的安全框架和用户认证

K8s 的安全框架和用户认证 1.Kubernetes 的安全框架1.1 认证:Authentication1.2 鉴权:Authorization1.3 准入控制:Admission Control 2.Kubernetes 的用户认证2.1 Kubernetes 的用户认证方式2.2 配置 Kubernetes 集群使用密码认证 Kubernetes 作为一个分布式的虚拟

Spring Framework系统框架

序号表示的是学习顺序 IoC(控制反转)/DI(依赖注入): ioc:思想上是控制反转,spring提供了一个容器,称为IOC容器,用它来充当IOC思想中的外部。 我的理解就是spring把这些对象集中管理,放在容器中,这个容器就叫Ioc这些对象统称为Bean 用对象的时候不用new,直接外部提供(bean) 当外部的对象有关系的时候,IOC给它俩绑好(DI) DI和IO

MySql 事务练习

事务(transaction) -- 事务 transaction-- 事务是一组操作的集合,是一个不可分割的工作单位,事务会将所有的操作作为一个整体一起向系统提交或撤销请求-- 事务的操作要么同时成功,要么同时失败-- MySql的事务默认是自动提交的,当执行一个DML语句,MySql会立即自动隐式提交事务-- 常见案例:银行转账-- 逻辑:A给B转账1000:1.查询

开源分布式数据库中间件

转自:https://www.csdn.net/article/2015-07-16/2825228 MyCat:开源分布式数据库中间件 为什么需要MyCat? 虽然云计算时代,传统数据库存在着先天性的弊端,但是NoSQL数据库又无法将其替代。如果传统数据易于扩展,可切分,就可以避免单机(单库)的性能缺陷。 MyCat的目标就是:低成本地将现有的单机数据库和应用平滑迁移到“云”端

Sentinel 高可用流量管理框架

Sentinel 是面向分布式服务架构的高可用流量防护组件,主要以流量为切入点,从限流、流量整形、熔断降级、系统负载保护、热点防护等多个维度来帮助开发者保障微服务的稳定性。 Sentinel 具有以下特性: 丰富的应用场景:Sentinel 承接了阿里巴巴近 10 年的双十一大促流量的核心场景,例如秒杀(即突发流量控制在系统容量可以承受的范围)、消息削峰填谷、集群流量控制、实时熔断下游不可用应