本文主要是介绍Tx-lcn分布式事务框架初体验,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
Tx-lcn分布式事务框架初体验
- 架构学习
- 底层通讯
- 客户端注册
- 事务消息通知
- 事务开启注解类型
- 事务锁底层实现
架构学习
TX-LCN 由两大模块组成,TxClient、TxManager
TxManager 独立服务部署
TxClient 作为模块的依赖框架,提供了 TX-LCN 的标准支持,事务发起方和参与方都属于 TxClient
底层通讯
客户端注册
http协议进行客户端注册并获取netty通信地址
所属类:com.codingapi.tx.config.ConfigReaderpublic String getTxUrl() {try {txManagerTxUrlService = spring.getBean(TxManagerTxUrlService.class);}catch (Exception e){logger.debug("load default txManagerTxUrlService ");}if(txManagerTxUrlService == null){txManagerTxUrlService = new TxManagerTxUrlService() {private final String configName = "tx.properties";private final String configKey = "url";@Overridepublic String getTxUrl() {return ConfigUtils.getString(configName,configKey);}};logger.debug("load default txManagerTxUrlService");}else{logger.debug("load txManagerTxUrlService");}return txManagerTxUrlService.getTxUrl();//这个类需要自己根据业务实现}
所属类:com.codingapi.tx.netty.service.impl.MQTxManagerServiceImpl
@Override
public String httpGetServer() {String url = configReader.getTxUrl() + "getServer";return managerHelper.httpGet(url);
}
所属类:com.codingapi.tx.netty.service.impl.MQTxManagerServiceImpl
@Override
public String httpGetServer() {String url = configReader.getTxUrl() + "getServer";return managerHelper.httpGet(url);
}
事务消息通知
底层消息事务通知使用netty-tcp进行通信
所属类:com.codingapi.tx.netty.service.impl.NettyDistributeServiceImplprivate void getTxServer() {//获取负载均衡服务地址String json = null;while (StringUtils.isEmpty(json)) {json = txManagerService.httpGetServer();logger.info("get txManager ->" + json);if (StringUtils.isEmpty(json)) {logger.error("TxManager服务器无法访问.");try {Thread.sleep(1000 * 2);} catch (InterruptedException e) {e.printStackTrace();}}}TxServer txServer = TxServer.parser(json);if (txServer != null) {logger.debug("txServer -> " + txServer);logger.info(txServer.toString());Constants.txServer = txServer;logger.info(Constants.txServer.toString());connectCont = 0;}}
所属类:com.codingapi.tx.netty.service.impl.NettyServiceImpl@Overridepublic synchronized void start() {if (isStarting) {return;}isStarting = true;nettyDistributeService.loadTxServer();String host = Constants.txServer.getHost();int port = Constants.txServer.getPort();final int heart = Constants.txServer.getHeart();int delay = Constants.txServer.getDelay();final TransactionHandler transactionHandler = new TransactionHandler(threadPool,nettyControlService, delay);workerGroup = new NioEventLoopGroup();try {Bootstrap b = new Bootstrap(); // (1)b.group(workerGroup); // (2)b.channel(NioSocketChannel.class); // (3)b.option(ChannelOption.SO_KEEPALIVE, true); // (4)b.handler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast("timeout", new IdleStateHandler(heart, heart, heart, TimeUnit.SECONDS));ch.pipeline().addLast(new LengthFieldPrepender(4, false));ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));ch.pipeline().addLast(transactionHandler);}});// Start the client.logger.info("connection txManager-socket-> host:" + host + ",port:" + port);ChannelFuture future = b.connect(host, port); // (5)future.addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture channelFuture) throws Exception {if (!channelFuture.isSuccess()) {channelFuture.channel().eventLoop().schedule(new Runnable() {@Overridepublic void run() {isStarting = false;start();}}, 5, TimeUnit.SECONDS);}}});} catch (Exception e) {logger.error(e.getLocalizedMessage());}}
事务开启注解类型
@TxTransaction(isStart = false/true)
1.使用切面来进行事务控制
public class AspectBeforeServiceImpl implements AspectBeforeService {@Autowiredprivate TransactionServerFactoryService transactionServerFactoryService;private Logger logger = LoggerFactory.getLogger(AspectBeforeServiceImpl.class);public Object around(String groupId, ProceedingJoinPoint point) throws Throwable {MethodSignature signature = (MethodSignature) point.getSignature();Method method = signature.getMethod();Class<?> clazz = point.getTarget().getClass();Object[] args = point.getArgs();Method thisMethod = clazz.getMethod(method.getName(), method.getParameterTypes());TxTransaction transaction = thisMethod.getAnnotation(TxTransaction.class);TxTransactionLocal txTransactionLocal = TxTransactionLocal.current();logger.debug("around--> groupId-> " +groupId+",txTransactionLocal->"+txTransactionLocal);TransactionInvocation invocation = new TransactionInvocation(clazz, thisMethod.getName(), thisMethod.toString(), args, method.getParameterTypes());TxTransactionInfo info = new TxTransactionInfo(transaction,txTransactionLocal,invocation,groupId);TransactionServer server = transactionServerFactoryService.createTransactionServer(info);return server.execute(point, info);//事务控制}
}
2.事务控制类包含三个实现
事务控制接口类:com.codingapi.tx.aop.service.TransactionServer
实现:
事务锁底层实现
事务锁任务task中使用lock 和 condition 条件锁,主要是通过条件锁等待进行阻塞,收到消息后通知解锁来进行事务控制,事务的最终控制是在收到回滚消息后抛出异常实现:
收到netty消息:
消息业务处理实现类:com.codingapi.tx.netty.service.impl.NettyControlServiceImpl@Overridepublic void executeService(final ChannelHandlerContext ctx,final String json) {if (StringUtils.isNotEmpty(json)) {JSONObject resObj = JSONObject.parseObject(json);if (resObj.containsKey("a")) {// tm发送数据给tx模块的处理指令transactionControlService.notifyTransactionMsg(ctx,resObj,json);}else{//tx发送数据给tm的响应返回数据String key = resObj.getString("k");responseMsg(key,resObj);}}}
------------------>通知消息@Overridepublic void notifyTransactionMsg(ChannelHandlerContext ctx,JSONObject resObj, String json) {String action = resObj.getString("a");String key = resObj.getString("k");IActionService actionService = spring.getBean(action, IActionService.class);String res = actionService.execute(resObj, json);JSONObject data = new JSONObject();data.put("k", key);data.put("a", action);JSONObject params = new JSONObject();params.put("d", res);data.put("p", params);SocketUtils.sendMsg(ctx, data.toString());logger.debug("send notify data ->" + data.toString());}
----------------------->通知更新任务状态,并解锁@Overridepublic String execute(JSONObject resObj, String json) {String res;//通知提醒final int state = resObj.getInteger("c");String taskId = resObj.getString("t");if(transactionControl.executeTransactionOperation()) {TaskGroup task = TaskGroupManager.getInstance().getTaskGroup(taskId);logger.info("accept notify data ->" + json);if (task != null) {if (task.isAwait()) { //已经等待res = notifyWaitTask(task, state);} else {int index = 0;while (true) {if (index > 500) {res = "0";break;}if (task.isAwait()) { //已经等待res = notifyWaitTask(task, state);break;}index++;try {Thread.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}}}} else {res = "0";}}else{//非事务操作res = "1";transactionControl.autoNoTransactionOperation();}logger.info("accept notify response res ->" + res);return res;}---------------------->更新task状态,并解锁private String notifyWaitTask(TaskGroup task, int state) {String res;task.setState(state);task.signalTask();int count = 0;while (true) {if (task.isRemove()) {if (task.getState() == TaskState.rollback.getCode()|| task.getState() == TaskState.commit.getCode()) {res = "1";} else {res = "0";}break;}if (count > 1000) {//已经通知了,有可能失败.res = "2";break;}count++;try {Thread.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}}return res;}------------------>事务切面处理收到解锁消息,抛出异常@Overridepublic Object execute(final ProceedingJoinPoint point, final TxTransactionInfo info) throws Throwable {String kid = KidUtils.generateShortUuid();String txGroupId = info.getTxGroupId();logger.debug("--->begin running transaction,groupId:" + txGroupId);long t1 = System.currentTimeMillis();boolean isHasIsGroup = transactionControl.hasGroup(txGroupId);TxTransactionLocal txTransactionLocal = new TxTransactionLocal();txTransactionLocal.setGroupId(txGroupId);txTransactionLocal.setHasStart(false);txTransactionLocal.setKid(kid);txTransactionLocal.setHasIsGroup(isHasIsGroup);txTransactionLocal.setMaxTimeOut(Constants.txServer.getCompensateMaxWaitTime());TxTransactionLocal.setCurrent(txTransactionLocal);try {Object res = point.proceed();//写操作 处理if(!txTransactionLocal.isReadOnly()) {String methodStr = info.getInvocation().getMethodStr();TxGroup resTxGroup = txManagerService.addTransactionGroup(txGroupId, kid, isHasIsGroup, methodStr);//已经进入过该模块的,不再执行此方法if(!isHasIsGroup) {String type = txTransactionLocal.getType();TxTask waitTask = TaskGroupManager.getInstance().getTask(kid, type);//lcn 连接已经开始等待时.//等待锁释放while (waitTask != null && !waitTask.isAwait()) {TimeUnit.MILLISECONDS.sleep(1);}if (resTxGroup == null) {//通知业务回滚事务if (waitTask != null) {//修改事务组状态异常waitTask.setState(-1);waitTask.signalTask();//抛出异常消息回滚throw new ServiceException("update TxGroup error, groupId:" + txGroupId);}}}}return res;} catch (Throwable e) {throw e;} finally {TxTransactionLocal.setCurrent(null);long t2 = System.currentTimeMillis();logger.debug("<---end running transaction,groupId:" + txGroupId+",execute time:"+(t2-t1));}}
这篇关于Tx-lcn分布式事务框架初体验的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!