Tx-lcn分布式事务框架初体验

2023-12-24 10:38

本文主要是介绍Tx-lcn分布式事务框架初体验,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

Tx-lcn分布式事务框架初体验

  • 架构学习
    • 底层通讯
      • 客户端注册
      • 事务消息通知
  • 事务开启注解类型
  • 事务锁底层实现

架构学习

TX-LCN 由两大模块组成,TxClient、TxManager
TxManager 独立服务部署
TxClient 作为模块的依赖框架,提供了 TX-LCN 的标准支持,事务发起方和参与方都属于 TxClient
在这里插入图片描述

底层通讯

客户端注册

http协议进行客户端注册并获取netty通信地址

所属类:com.codingapi.tx.config.ConfigReaderpublic String getTxUrl() {try {txManagerTxUrlService =  spring.getBean(TxManagerTxUrlService.class);}catch (Exception e){logger.debug("load default txManagerTxUrlService ");}if(txManagerTxUrlService == null){txManagerTxUrlService = new TxManagerTxUrlService() {private final String configName = "tx.properties";private final String configKey = "url";@Overridepublic String getTxUrl() {return ConfigUtils.getString(configName,configKey);}};logger.debug("load default txManagerTxUrlService");}else{logger.debug("load txManagerTxUrlService");}return txManagerTxUrlService.getTxUrl();//这个类需要自己根据业务实现}
所属类:com.codingapi.tx.netty.service.impl.MQTxManagerServiceImpl
@Override
public String httpGetServer() {String url = configReader.getTxUrl() + "getServer";return managerHelper.httpGet(url);
}
所属类:com.codingapi.tx.netty.service.impl.MQTxManagerServiceImpl
@Override
public String httpGetServer() {String url = configReader.getTxUrl() + "getServer";return managerHelper.httpGet(url);
}

事务消息通知

底层消息事务通知使用netty-tcp进行通信

所属类:com.codingapi.tx.netty.service.impl.NettyDistributeServiceImplprivate void getTxServer() {//获取负载均衡服务地址String json = null;while (StringUtils.isEmpty(json)) {json = txManagerService.httpGetServer();logger.info("get txManager ->" + json);if (StringUtils.isEmpty(json)) {logger.error("TxManager服务器无法访问.");try {Thread.sleep(1000 * 2);} catch (InterruptedException e) {e.printStackTrace();}}}TxServer txServer = TxServer.parser(json);if (txServer != null) {logger.debug("txServer -> " + txServer);logger.info(txServer.toString());Constants.txServer = txServer;logger.info(Constants.txServer.toString());connectCont = 0;}}
所属类:com.codingapi.tx.netty.service.impl.NettyServiceImpl@Overridepublic synchronized void start() {if (isStarting) {return;}isStarting = true;nettyDistributeService.loadTxServer();String host = Constants.txServer.getHost();int port = Constants.txServer.getPort();final int heart = Constants.txServer.getHeart();int delay = Constants.txServer.getDelay();final TransactionHandler transactionHandler = new TransactionHandler(threadPool,nettyControlService, delay);workerGroup = new NioEventLoopGroup();try {Bootstrap b = new Bootstrap(); // (1)b.group(workerGroup); // (2)b.channel(NioSocketChannel.class); // (3)b.option(ChannelOption.SO_KEEPALIVE, true); // (4)b.handler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast("timeout", new IdleStateHandler(heart, heart, heart, TimeUnit.SECONDS));ch.pipeline().addLast(new LengthFieldPrepender(4, false));ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));ch.pipeline().addLast(transactionHandler);}});// Start the client.logger.info("connection txManager-socket-> host:" + host + ",port:" + port);ChannelFuture future = b.connect(host, port); // (5)future.addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture channelFuture) throws Exception {if (!channelFuture.isSuccess()) {channelFuture.channel().eventLoop().schedule(new Runnable() {@Overridepublic void run() {isStarting = false;start();}}, 5, TimeUnit.SECONDS);}}});} catch (Exception e) {logger.error(e.getLocalizedMessage());}}

事务开启注解类型

@TxTransaction(isStart = false/true)
1.使用切面来进行事务控制

public class AspectBeforeServiceImpl implements AspectBeforeService {@Autowiredprivate TransactionServerFactoryService transactionServerFactoryService;private Logger logger = LoggerFactory.getLogger(AspectBeforeServiceImpl.class);public Object around(String groupId, ProceedingJoinPoint point) throws Throwable {MethodSignature signature = (MethodSignature) point.getSignature();Method method = signature.getMethod();Class<?> clazz = point.getTarget().getClass();Object[] args = point.getArgs();Method thisMethod = clazz.getMethod(method.getName(), method.getParameterTypes());TxTransaction transaction = thisMethod.getAnnotation(TxTransaction.class);TxTransactionLocal txTransactionLocal = TxTransactionLocal.current();logger.debug("around--> groupId-> " +groupId+",txTransactionLocal->"+txTransactionLocal);TransactionInvocation invocation = new TransactionInvocation(clazz, thisMethod.getName(), thisMethod.toString(), args, method.getParameterTypes());TxTransactionInfo info = new TxTransactionInfo(transaction,txTransactionLocal,invocation,groupId);TransactionServer server = transactionServerFactoryService.createTransactionServer(info);return server.execute(point, info);//事务控制}
}

2.事务控制类包含三个实现
事务控制接口类:com.codingapi.tx.aop.service.TransactionServer
实现:
在这里插入图片描述

事务锁底层实现

事务锁任务task中使用lock 和 condition 条件锁,主要是通过条件锁等待进行阻塞,收到消息后通知解锁来进行事务控制,事务的最终控制是在收到回滚消息后抛出异常实现:

收到netty消息:

消息业务处理实现类:com.codingapi.tx.netty.service.impl.NettyControlServiceImpl@Overridepublic void executeService(final ChannelHandlerContext ctx,final String json) {if (StringUtils.isNotEmpty(json)) {JSONObject resObj = JSONObject.parseObject(json);if (resObj.containsKey("a")) {// tm发送数据给tx模块的处理指令transactionControlService.notifyTransactionMsg(ctx,resObj,json);}else{//tx发送数据给tm的响应返回数据String key = resObj.getString("k");responseMsg(key,resObj);}}}
------------------>通知消息@Overridepublic void notifyTransactionMsg(ChannelHandlerContext ctx,JSONObject resObj, String json) {String action = resObj.getString("a");String key = resObj.getString("k");IActionService actionService = spring.getBean(action, IActionService.class);String res = actionService.execute(resObj, json);JSONObject data = new JSONObject();data.put("k", key);data.put("a", action);JSONObject params = new JSONObject();params.put("d", res);data.put("p", params);SocketUtils.sendMsg(ctx, data.toString());logger.debug("send notify data ->" + data.toString());}
----------------------->通知更新任务状态,并解锁@Overridepublic String execute(JSONObject resObj, String json) {String res;//通知提醒final int state = resObj.getInteger("c");String taskId = resObj.getString("t");if(transactionControl.executeTransactionOperation()) {TaskGroup task = TaskGroupManager.getInstance().getTaskGroup(taskId);logger.info("accept notify data ->" + json);if (task != null) {if (task.isAwait()) {   //已经等待res = notifyWaitTask(task, state);} else {int index = 0;while (true) {if (index > 500) {res = "0";break;}if (task.isAwait()) {   //已经等待res = notifyWaitTask(task, state);break;}index++;try {Thread.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}}}} else {res = "0";}}else{//非事务操作res = "1";transactionControl.autoNoTransactionOperation();}logger.info("accept notify response res ->" + res);return res;}---------------------->更新task状态,并解锁private String notifyWaitTask(TaskGroup task, int state) {String res;task.setState(state);task.signalTask();int count = 0;while (true) {if (task.isRemove()) {if (task.getState() == TaskState.rollback.getCode()|| task.getState() == TaskState.commit.getCode()) {res = "1";} else {res = "0";}break;}if (count > 1000) {//已经通知了,有可能失败.res = "2";break;}count++;try {Thread.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}}return res;}------------------>事务切面处理收到解锁消息,抛出异常@Overridepublic Object execute(final ProceedingJoinPoint point, final TxTransactionInfo info) throws Throwable {String kid = KidUtils.generateShortUuid();String txGroupId = info.getTxGroupId();logger.debug("--->begin running transaction,groupId:" + txGroupId);long t1 = System.currentTimeMillis();boolean isHasIsGroup =  transactionControl.hasGroup(txGroupId);TxTransactionLocal txTransactionLocal = new TxTransactionLocal();txTransactionLocal.setGroupId(txGroupId);txTransactionLocal.setHasStart(false);txTransactionLocal.setKid(kid);txTransactionLocal.setHasIsGroup(isHasIsGroup);txTransactionLocal.setMaxTimeOut(Constants.txServer.getCompensateMaxWaitTime());TxTransactionLocal.setCurrent(txTransactionLocal);try {Object res = point.proceed();//写操作 处理if(!txTransactionLocal.isReadOnly()) {String methodStr = info.getInvocation().getMethodStr();TxGroup resTxGroup = txManagerService.addTransactionGroup(txGroupId, kid, isHasIsGroup, methodStr);//已经进入过该模块的,不再执行此方法if(!isHasIsGroup) {String type = txTransactionLocal.getType();TxTask waitTask = TaskGroupManager.getInstance().getTask(kid, type);//lcn 连接已经开始等待时.//等待锁释放while (waitTask != null && !waitTask.isAwait()) {TimeUnit.MILLISECONDS.sleep(1);}if (resTxGroup == null) {//通知业务回滚事务if (waitTask != null) {//修改事务组状态异常waitTask.setState(-1);waitTask.signalTask();//抛出异常消息回滚throw new ServiceException("update TxGroup error, groupId:" + txGroupId);}}}}return res;} catch (Throwable e) {throw e;} finally {TxTransactionLocal.setCurrent(null);long t2 = System.currentTimeMillis();logger.debug("<---end running transaction,groupId:" + txGroupId+",execute time:"+(t2-t1));}}

这篇关于Tx-lcn分布式事务框架初体验的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/531502

相关文章

Spring 中的切面与事务结合使用完整示例

《Spring中的切面与事务结合使用完整示例》本文给大家介绍Spring中的切面与事务结合使用完整示例,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考... 目录 一、前置知识:Spring AOP 与 事务的关系 事务本质上就是一个“切面”二、核心组件三、完

GSON框架下将百度天气JSON数据转JavaBean

《GSON框架下将百度天气JSON数据转JavaBean》这篇文章主要为大家详细介绍了如何在GSON框架下实现将百度天气JSON数据转JavaBean,文中的示例代码讲解详细,感兴趣的小伙伴可以了解下... 目录前言一、百度天气jsON1、请求参数2、返回参数3、属性映射二、GSON属性映射实战1、类对象映

Redis实现分布式锁全过程

《Redis实现分布式锁全过程》文章介绍Redis实现分布式锁的方法,包括使用SETNX和EXPIRE命令确保互斥性与防死锁,Redisson客户端提供的便捷接口,以及Redlock算法通过多节点共识... 目录Redis实现分布式锁1. 分布式锁的基本原理2. 使用 Redis 实现分布式锁2.1 获取锁

Redis分布式锁中Redission底层实现方式

《Redis分布式锁中Redission底层实现方式》Redission基于Redis原子操作和Lua脚本实现分布式锁,通过SETNX命令、看门狗续期、可重入机制及异常处理,确保锁的可靠性和一致性,是... 目录Redis分布式锁中Redission底层实现一、Redission分布式锁的基本使用二、Red

redis和redission分布式锁原理及区别说明

《redis和redission分布式锁原理及区别说明》文章对比了synchronized、乐观锁、Redis分布式锁及Redission锁的原理与区别,指出在集群环境下synchronized失效,... 目录Redis和redission分布式锁原理及区别1、有的同伴想到了synchronized关键字

解决若依微服务框架启动报错的问题

《解决若依微服务框架启动报错的问题》Invalidboundstatement错误通常由MyBatis映射文件未正确加载或Nacos配置未读取导致,需检查XML的namespace与方法ID是否匹配,... 目录ruoyi-system模块报错报错详情nacos文件目录总结ruoyi-systnGLNYpe

分布式锁在Spring Boot应用中的实现过程

《分布式锁在SpringBoot应用中的实现过程》文章介绍在SpringBoot中通过自定义Lock注解、LockAspect切面和RedisLockUtils工具类实现分布式锁,确保多实例并发操作... 目录Lock注解LockASPect切面RedisLockUtils工具类总结在现代微服务架构中,分布

Python Web框架Flask、Streamlit、FastAPI示例详解

《PythonWeb框架Flask、Streamlit、FastAPI示例详解》本文对比分析了Flask、Streamlit和FastAPI三大PythonWeb框架:Flask轻量灵活适合传统应用... 目录概述Flask详解Flask简介安装和基础配置核心概念路由和视图模板系统数据库集成实际示例Stre

Olingo分析和实践之OData框架核心组件初始化(关键步骤)

《Olingo分析和实践之OData框架核心组件初始化(关键步骤)》ODataSpringBootService通过初始化OData实例和服务元数据,构建框架核心能力与数据模型结构,实现序列化、URI... 目录概述第一步:OData实例创建1.1 OData.newInstance() 详细分析1.1.1

Jenkins分布式集群配置方式

《Jenkins分布式集群配置方式》:本文主要介绍Jenkins分布式集群配置方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录1.安装jenkins2.配置集群总结Jenkins是一个开源项目,它提供了一个容易使用的持续集成系统,并且提供了大量的plugin满