本文主要是介绍JRT实现缓存协议,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
上一篇介绍的借助ORM的增、删、改和DolerGet方法,ORM可以很精准的知道热点数据做内存缓存。那么就有一个问题存在,即部署了多个站点时候,如果用户在一个Web里修改数据了,那么其他Web的ORM是不知道这个变化的,其他Web还是缓存的老数据的话就会造成其他Web命中的缓存数据是老的,造成不可靠问题。
那么就需要一种机制来解决多Web缓存不一致问题,参照ECP实现,把Web分为主服务器和从服务器。主服务器启动TCP服务端,从服务器启动TCP客户端连接主服务器,从服务器和主服务器之间一直保留TCP长连接用来通知缓存变化数据。这样在一个服务器增、删、改数据后就能及时通知其他服务器更新缓存,这样所有服务器的缓存数据都是可信赖的。
首先提取发送数据的对象类型
然后实现ECP管理类ECPManager来管理启动服务端和客户端
package JRT.DAL.ORM.Global;import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.ArrayList;
import java.util.Hashtable;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;import JRT.Core.Util.LogUtils;
import JRT.DAL.ORM.Global.ECPDto;/*** 企业缓存协议,基于TCP在多台服务器直接共享缓存的更新,从而确保每个Web下的缓存数据的可靠性,这里实现TCP服务端当做小机,其他服务当做客户端和小机连接。* 小机会分发增加数据(增加数据分发与否倒是不影响缓存可靠性,主要是修改和删除)、修改数据、删除数据的执行记录给每个客户端,客户端按收到的记录把数据推入缓存*/
public class ECPManager {/*** 要推入TCP的缓存队列*/public static ConcurrentLinkedDeque ECPQuen = new ConcurrentLinkedDeque();/*** 主处理进程*/private static Thread MainThread = null;/*** 发送数据的操作对象*/public static Socket Sender = null;/*** 写操作对象*/public static PrintWriter Writer = null;/*** 数据编码*/private static String Encode="GBK";/*** 缓存所有客户端*/private static ConcurrentHashMap<String,Socket> AllClient= new ConcurrentHashMap<>();/*** IP地址*/private static String IP="";/*** 端口*/private static int Port=1991;/*** 是否启用了ECP*/private static boolean UseEcp=false;/*** 加入缓存,直接缓存,具体的后续有缓存管理器线程维护缓存,这里只管加入队列即可** @param obj* @throws Exception*/public static void InECPToQuen(ECPDto obj) throws Exception{ECPQuen.add(obj);}/*** push目前的ECP缓存数据到远程服务器,供GlobalManager定时器定时推送*/public static void TryPushEcp() throws Exception{try {//如果是客户端,先检查连接if (!IP.isEmpty()) {//重连if (Sender == null || Sender.isClosed()) {LogUtils.WriteSecurityLog("ECP尝试重连:" + IP + ":" + Port);StartEcpManager(IP, Port);Thread.sleep(1000);}if (Sender == null || Sender.isClosed()) {return;} else {LogUtils.WriteSecurityLog("ECP尝试重连成功:" + IP + ":" + Port);}}List<ECPDto> pushList=null;//从缓存队列里面弹出数据pushwhile (ECPQuen.size() > 0) {ECPDto obj = (ECPDto)ECPQuen.pop();if (obj != null) {//启用了ECP就push到服务端,否则就只更新自己缓存if(UseEcp==true){//初始化push列表if(pushList==null){pushList=new ArrayList<>();}pushList.add(obj);}else{//转换成数据实体推入缓存JRT.DAL.ORM.Global.GlobalManager.InCache(obj);}}}//直接列表一次推送,没有启用ECP的话这个列表一直是空if(pushList!=null){//客户端推送if (!IP.isEmpty()) {Writer.print(JRT.Core.Util.JsonUtil.Object2Json(pushList));Writer.flush();}//服务端推送else {//给每个连接的客户端推送信息for (String ip : AllClient.keySet()) {Socket oneClient = AllClient.get(ip);//移除关闭的客户端if (oneClient.isClosed()) {AllClient.remove(ip);}PrintWriter oneWriter = new PrintWriter(new OutputStreamWriter(oneClient.getOutputStream(), Encode), false);oneWriter.print(JRT.Core.Util.JsonUtil.Object2Json(pushList));oneWriter.flush();}}}}catch (Exception ex){LogUtils.WriteExceptionLog("推送数据到ECP异常", ex);}}/*** 启动ECP管理* @param ip* @param port*/public static void StartEcpManager(String ip, int port){IP=ip;Port=port;//当客户端if (!ip.isEmpty()) {MainThread = new Thread(new Runnable() {@Overridepublic void run() {//得到输入流InputStream inputStream = null;//创建Socket对象并连接到服务端Socket socket = null;try {//创建Socket对象并连接到服务端socket = new Socket(ip, port);Sender = socket;Writer = new PrintWriter(new OutputStreamWriter(Sender.getOutputStream(), Encode), false);//得到输入流inputStream = socket.getInputStream();//IO读取byte[] buf = new byte[10240];int readlen = 0;//阻塞读取数据while ((readlen = inputStream.read(buf)) != -1) {try {String ecpJson = new String(buf, 0, readlen, Encode);//得到ECP实体List<ECPDto> dtoList = (List<ECPDto>)JRT.Core.Util.JsonUtil.Json2Object(ecpJson, ECPDto.class);if(dtoList!=null&&dtoList.size()>0){for(ECPDto dto:dtoList) {//转换成数据实体推入缓存JRT.DAL.ORM.Global.GlobalManager.InCache(dto);}}}catch (Exception ee){LogUtils.WriteExceptionLog("ECP处理主服务器数据异常", ee);}}}catch (IOException ex) {LogUtils.WriteExceptionLog("ECP侦听TCP服务异常", ex);}finally {Sender=null;try {if (inputStream != null) {//关闭输入inputStream.close();}if (Writer != null) {Writer.flush();//关闭输出Writer.close();Writer=null;}if (socket != null) {// 关闭连接socket.close();}}catch (Exception ex) {LogUtils.WriteExceptionLog("释放TCP资源异常", ex);}}}});}//当服务端else {MainThread = new Thread(new Runnable() {@Overridepublic void run() {//得到输入流InputStream inputStream = null;//创建Socket对象并连接到服务端Socket socket = null;try {ServerSocket serverSocket = new ServerSocket(port);//增加一个无限循环while (true) {//等待客户端连接,阻塞Socket clientSocket = serverSocket.accept();//按IP存客户端连接String clientIP=clientSocket.getInetAddress().getHostAddress();AllClient.put(clientIP,clientSocket);//接收客户端消息Thread ClientThread = new Thread(new Runnable() {@Overridepublic void run() {try {//得到输出流Writer = new PrintWriter(new OutputStreamWriter(clientSocket.getOutputStream(), Encode), false);//得到输入流InputStream inputStream = clientSocket.getInputStream();//IO读取byte[] buf = new byte[10240];int readlen = 0;//阻塞读取数据while ((readlen = inputStream.read(buf)) != -1) {String ecpJson = new String(buf, 0, readlen, Encode);//得到ECP实体List<ECPDto> dtoList = (List<ECPDto>)JRT.Core.Util.JsonUtil.Json2Object(ecpJson, ECPDto.class);if(dtoList!=null&&dtoList.size()>0){for(ECPDto dto:dtoList) {//转换成数据实体推入缓存JRT.DAL.ORM.Global.GlobalManager.InCache(dto);}//给每个连接的客户端推送信息for (String ip : AllClient.keySet()) {Socket oneClient = AllClient.get(ip);//移除关闭的客户端if (oneClient.isClosed()) {AllClient.remove(ip);}PrintWriter oneWriter = new PrintWriter(new OutputStreamWriter(oneClient.getOutputStream(), Encode), false);oneWriter.print(ecpJson);oneWriter.flush();}}}}catch (Exception ee){LogUtils.WriteExceptionLog("ECP处理客户端数据异常", ee);}}});ClientThread.start();}}catch (IOException ex) {LogUtils.WriteExceptionLog("侦听仪器TCP异常", ex);}finally {try {if (inputStream != null) {//关闭输入inputStream.close();}if (Writer != null) {Writer.flush();//关闭输出Writer.close();}if (socket != null) {// 关闭连接socket.close();}} catch (Exception ex) {LogUtils.WriteExceptionLog("释放TCP资源异常", ex);}}}});}//启动主进程MainThread.start();UseEcp=true;}/*** 返回ECP待处理队列的json数据供调试看是否符合预期* @return*/public static String ViewECPQuenDate() throws Exception{return JRT.Core.Util.JsonUtil.Object2Json(ECPQuen);}}
ECP管理类再对接上GlobalManager
package JRT.DAL.ORM.Global;import java.io.OutputStreamWriter;
import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.List;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.atomic.AtomicInteger;import JRT.Core.MultiPlatform.JRTConfigurtaion;
import JRT.Core.Util.LogUtils;
import JRT.DAL.ORM.Global.OneGlobalNode;
import JRT.DAL.ORM.Global.ECPDto;/*** 实现内存模拟global的效果*/
public class GlobalManager {/*** 在内存里缓存热点数据*/private static ConcurrentHashMap<String, ConcurrentHashMap<String, OneGlobalNode>> AllHotData = new ConcurrentHashMap<>();/*** 要缓存数据的队列*/private static ConcurrentLinkedDeque TaskQuen = new ConcurrentLinkedDeque();/*** 管理缓存的定时器*/private static Timer ManageTimer = new Timer();/*** 缓存的最大对象数量*/public static Integer GlobalCacheNum = 100000;/*** 当前的缓存数量*/private static AtomicInteger CurCacheNum=new AtomicInteger(0);/*** 最后删除数据的时间*/private static Long LastDeleteTime=null;/*** 加入缓存,直接缓存,具体的后续有缓存管理器线程维护缓存,这里只管加入队列即可** @param obj* @throws Exception*/public static void InCache(ECPDto obj) throws Exception{TaskQuen.add(obj);}/*** 通过主键查询数据* @param model* @param id* @param <T>* @return* @throws Exception*/public static <T> T DolerGet(T model,Object id) throws Exception{//实体的名称String modelName = model.getClass().getName();if(AllHotData.containsKey(modelName)){//命中数据,克隆返回if(AllHotData.get(modelName).containsKey(id)){OneGlobalNode node=AllHotData.get(modelName).get(id);//更新时间node.Time=JRT.Core.Util.TimeParser.GetTimeInMillis();Object retObj=JRT.Core.Util.JsonUtil.CloneObject(node.Data);return (T)retObj;}}return null;}/*** 启动缓存数据管理的线程*/public static void StartGlobalManagerTask() throws Exception{//最大缓存数量String GlobalCacheNumConf = JRTConfigurtaion.Configuration("GlobalCacheNum");if (GlobalCacheNumConf != null && !GlobalCacheNumConf.isEmpty()) {GlobalCacheNum = JRT.Core.Util.Convert.ToInt32(GlobalCacheNumConf);}//定时任务TimerTask timerTask = new TimerTask() {@Overridepublic void run() {try {//缓存队列的数据并入缓存while (TaskQuen.size() > 0) {//处理要加入缓存的队列DealOneDataQuen();}//尝试推送ECP数据到主服务JRT.DAL.ORM.Global.ECPManager.TryPushEcp();//清理多余的缓存数据,这里需要讲究算法,要求在上百万的缓存数据里快速找到时间最久远的数据if(CurCacheNum.get()>GlobalCacheNum){//每轮清理时间处于上次清理时间和当前时间前百分之5的老数据long Diff=(JRT.Core.Util.TimeParser.GetTimeInMillis()-LastDeleteTime)/20;//留下数据的最大时间long LeftMaxTime=LastDeleteTime+Diff;//遍历所有的热点数据for (String model : AllHotData.keySet()) {ConcurrentHashMap<String, OneGlobalNode> oneTableHot=AllHotData.get(model);//记录要删除的数据List<String> delList=new ArrayList<>();for (String key : oneTableHot.keySet()) {OneGlobalNode one=oneTableHot.get(key);//需要删除的数据if(one.Time<LeftMaxTime){delList.add(key);}}//移除时间久的数据for(String del:delList){oneTableHot.remove(del);}}}//清理时间久远的缓存数据} catch (Exception ex) {LogUtils.WriteExceptionLog("处理Global缓存异常", ex);}}};//尝试启动ECP管理器String IP = JRTConfigurtaion.Configuration("ECPIP");String Port = JRTConfigurtaion.Configuration("ECPPort");if(Port!=null&&!Port.isEmpty()){if(IP==null){IP="";}//启动ECP管理器JRT.DAL.ORM.Global.ECPManager.StartEcpManager(IP,JRT.Core.Util.Convert.ToInt32(Port));}//启动缓存管理定时器ManageTimer.schedule(timerTask, 0, 500);}/*** 通过实体名称获得实体类型信息** @param modelName 实体名称* @return*/private static Class GetTypeByName(String modelName) throws Exception {return JRT.Core.Util.ReflectUtil.GetType(JRT.Core.MultiPlatform.JRTConfigurtaion.Configuration("ModelName") + ".Entity." + modelName, JRT.Core.MultiPlatform.JRTConfigurtaion.Configuration("ModelName"));}/*** 处理队列里的一条数据并入缓存*/private static void DealOneDataQuen() {try {Object obj = TaskQuen.pop();if (obj != null) {ECPDto dto=(ECPDto)obj;//添加或者更新缓存if(dto.Cmd.equals("A")||dto.Cmd.equals("U")) {Class type = GetTypeByName(dto.Model);Object entity = JRT.Core.Util.JsonUtil.Json2Object(dto.Data, type);JRT.DAL.ORM.Common.TableInfo tableInfo = JRT.DAL.ORM.Common.ModelToSqlUtil.GetTypeInfo(entity);//实体的名称String modelName = dto.Model;//得到数据的主键String id = tableInfo.ID.Value.toString();if (!AllHotData.containsKey(modelName)) {ConcurrentHashMap<String, OneGlobalNode> map = new ConcurrentHashMap<>();AllHotData.put(modelName, map);}//更新数据if (AllHotData.get(modelName).containsKey(id)) {AllHotData.get(modelName).get(id).Data = entity;AllHotData.get(modelName).get(id).Time = JRT.Core.Util.TimeParser.GetTimeInMillis();}//加入到缓存else {OneGlobalNode node = new OneGlobalNode();node.Data = entity;node.Time = JRT.Core.Util.TimeParser.GetTimeInMillis();AllHotData.get(modelName).put(id, node);//缓存数量加1CurCacheNum.addAndGet(1);//记录时间if (LastDeleteTime == null) {LastDeleteTime = JRT.Core.Util.TimeParser.GetTimeInMillis();}}}//删除缓存else if(dto.Cmd.equals("D")){Class type = GetTypeByName(dto.Model);Object entity = JRT.Core.Util.JsonUtil.Json2Object(dto.Data, type);JRT.DAL.ORM.Common.TableInfo tableInfo = JRT.DAL.ORM.Common.ModelToSqlUtil.GetTypeInfo(entity);//实体的名称String modelName = dto.Model;//得到数据的主键String id = tableInfo.ID.Value.toString();if (AllHotData.containsKey(modelName)&&AllHotData.get(modelName).containsKey(id)) {AllHotData.get(modelName).remove(id);}}//清空表缓存else if(dto.Cmd.equals("CLEAR")){//实体的名称String modelName = dto.Model;if (AllHotData.containsKey(modelName)) {AllHotData.get(modelName).clear();}}}}catch (Exception ex) {LogUtils.WriteExceptionLog("处理Global缓存添加异常", ex);}}/*** 返回Global的json数据供调试看是否符合预期* @return*/public static String ViewGlobalJson() throws Exception{return JRT.Core.Util.JsonUtil.Object2Json(AllHotData);}/*** 返回Global待处理队列的json数据供调试看是否符合预期* @return*/public static String ViewGlobalTaskQuenDate() throws Exception{return JRT.Core.Util.JsonUtil.Object2Json(TaskQuen);}}
增删改和DolerGet调整
/*** 保存对象,不抛异常,执行信息通过参数输出** @param entity 实体对象* @param outParam 输出执行成功或失败信息,执行成功时输出执行记录主键* @param <T> 实体类型约束* @return影响行数*/@Overridepublic <T> int Save(T entity, OutValue outValue, OutParam outParam) throws Exception {int row = 0;PreparedStatement stmt = null;boolean innerT = false; //标识是否内部开启事务String sql = "";try {//根据实体对象获取表信息JRT.DAL.ORM.Common.TableInfo tableInfo = JRT.DAL.ORM.Common.ModelToSqlUtil.GetTypeInfo(entity);HashParam hash = new HashParam();//获取插入SQL语句sql = JRT.DAL.ORM.Common.ModelToSqlUtil.GetInsertSqlByTableInfo(Manager().GetIDbFactory(factoryName), tableInfo, hash, false);//写SQL日志JRT.Core.Util.LogUtils.WriteSqlLog("执行插入SQL:" + sql + ";SQL参数:" + JRT.Core.Util.JsonUtil.Object2Json(hash.GetParam()));//获取ID列String idKey = tableInfo.ID.Key;//声明式SQL,并设置参数if (!idKey.isEmpty()) {stmt = Manager().Connection().prepareStatement(sql, new String[]{idKey});} else {stmt = Manager().Connection().prepareStatement(sql);}String paraSql = DBParaUtil.SetDBPara(stmt, hash.GetParam());row = stmt.executeUpdate();ResultSet rowID = stmt.getGeneratedKeys();JRT.Core.Util.LogUtils.WriteSqlLog("参数:" + paraSql);//保存成功返回记录主键,返回影响记录数 1if (row == 1) {if (rowID.next() && (!idKey.isEmpty())) {outValue.Value = rowID.getInt(idKey);//设置RowID到实体JRT.Core.Util.ReflectUtil.SetObjValue(entity, tableInfo.ID.Key, rowID.getInt(idKey));//尝试把数据推入缓存队列Manager().TryPushToCache(entity, "A");}} else {outParam.Code = OutStatus.ERROR;outParam.Message = "保存数据失败,执行保存返回:" + row;}return row;} catch (Exception ex) {outParam.Code = OutStatus.ERROR;//操作异常,判断如果开启事务,则回滚事务if (Manager().Hastransaction) {if (!Manager().RollTransaction()) {outParam.Message = "保存数据失败!" + ex.getCause().getMessage() + ";回滚事务失败。";}}outParam.Message = "保存数据失败!" + ex.getCause().getMessage() + "执行SQL:" + sql;}//操作结束释放资源finally {if (stmt != null) {stmt.close();}//如果上层调用未开启事务,则调用结束释放数据库连接if (!Manager().Hastransaction) {Manager().Close();}}return row;}/*** 更新实体对象** @param entity 实体对象* @param param 更新条件,有条件就按条件更新,没有条件就按主键更新* @param outParam 输出执行成功或失败的信息* @param updateColName 更新属性名集合,无属性则更新实体的所有属性* @param joiner 连接符,为空或不给则按则按且连接,给的话长度应该比参数长度少1,如: and* @param operators 操作符,为空或不给的话各条件按等来比较,给的话长度应该跟参数长度一样,如: !=* @param <T> 类型限定符* @return 影响行数*/@Overridepublic <T> int Update(T entity, HashParam param, OutParam outParam, List<String> updateColName, List<String> joiner, List<String> operators) throws Exception {PreparedStatement stmt = null;if (outParam == null) outParam = new OutParam();int row = 0;boolean innerT = false; //标识是否内部开启事务try {//根据实体获取表信息JRT.DAL.ORM.Common.TableInfo tableInfo = JRT.DAL.ORM.Common.ModelToSqlUtil.GetTypeInfo(entity);HashParam hash = new HashParam();//获取更新的SQL语句String sql = JRT.DAL.ORM.Common.ModelToSqlUtil.GetUpdateSqlByTableInfo(Manager().GetIDbFactory(factoryName), tableInfo, param, updateColName, joiner, operators, hash);//写SQL日志JRT.Core.Util.LogUtils.WriteSqlLog("执行更新SQL:" + sql);//声明式SQL,并且设置参数stmt = Manager().Connection().prepareStatement(sql);String paraSql = DBParaUtil.SetDBPara(stmt, hash.GetParam());row = stmt.executeUpdate();JRT.Core.Util.LogUtils.WriteSqlLog("参数:" + paraSql);if (row == 1) {//尝试把数据推入缓存队列Manager().TryPushToCache(entity, "U");}outParam.Code = OutStatus.SUCCESS;outParam.Message = "更新数据成功。";return row;} catch (Exception ex) {//操作异常,判断如果开启了事务,就回滚事务outParam.Code = OutStatus.ERROR;if (Manager().Hastransaction) {if (!Manager().RollTransaction()) {outParam.Message = "更新数据失败!" + ex.getCause().getMessage() + ";回滚事务失败。";}}outParam.Message = "更新数据失败!" + ex.getCause().getMessage();}//操作结束释放资源finally {if (stmt != null) {stmt.close();}//如果上层调用未开启事务,则调用结束释放数据库连接if (!Manager().Hastransaction) {Manager().Close();}}return row;}/*** 根据条件删除记录** @param entity 实体对象* @param param 删除条件,有条件按条件删除,没有条件按主键删除* @param outParam 输出执行成功或失败的信息* @param joiner 多条件逻辑连接符,为空或不给则按则按且连接,给的话长度应该比参数长度少1,如: and* @param operators 操作符,为空或不给的话各条件按等来比较,给的话长度应该跟参数长度一样,如: !=* @param <T> 类型限定符* @return 影响行数*/@Overridepublic <T> int Remove(T entity, HashParam param, OutParam outParam, List<String> joiner, List<String> operators) throws Exception {PreparedStatement stmt = null;if (outParam == null) outParam = new OutParam();int row = 0;try {//根据实体对象获取表信息JRT.DAL.ORM.Common.TableInfo tableInfo = JRT.DAL.ORM.Common.ModelToSqlUtil.GetTypeInfo(entity);HashParam hash = new HashParam();//获取删除SQL语句String sql = JRT.DAL.ORM.Common.ModelToSqlUtil.GetDeleteSqlByTableInfo(Manager().GetIDbFactory(factoryName), tableInfo, param, joiner, operators, hash);//写SQL日志JRT.Core.Util.LogUtils.WriteSqlLog("执行删除SQL:" + sql);//声明式SQL,并设置参数stmt = Manager().Connection().prepareStatement(sql);String paraSql = DBParaUtil.SetDBPara(stmt, hash.GetParam());row = stmt.executeUpdate();if (row == 1) {//尝试把数据推入缓存队列Manager().TryPushToCache(entity, "D");}JRT.Core.Util.LogUtils.WriteSqlLog("参数:" + paraSql);outParam.Code = OutStatus.SUCCESS;outParam.Message = "删除数据成功。";return row;} catch (Exception ex) {//操作异常,判断如果开启了事务,就回滚事务outParam.Code = OutStatus.ERROR;if (Manager().Hastransaction) {if (!Manager().RollTransaction()) {outParam.Message = "更新数据失败!" + ex.getCause().getMessage() + ";回滚事务失败。";}}outParam.Message = "更新数据失败!" + ex.getCause().getMessage();}//操作结束释放资源finally {if (stmt != null) {stmt.close();}//如果上层调用未开启事务,则调用结束释放数据库连接if (!Manager().Hastransaction) {Manager().Close();}}return row;}/*** 通过主键查询数据,带缓存的查询,用来解决关系库的复杂关系数据获取,顶替Cache的$g** @param model 实体* @param id 主键* @param <T>* @return* @throws Exception*/public <T> T DolerGet(T model, Object id) throws Exception {T ret = GlobalManager.DolerGet(model, id);//命中缓存直接返回if (ret != null) {return ret;}else {//调用数据库查询ret = GetById(model, id);//找到数据,推入缓存if(ret!=null) {ECPDto dto = new ECPDto();dto.Cmd = "A";dto.Model = ret.getClass().getSimpleName();dto.Data = JRT.Core.Util.JsonUtil.Object2Json(ret);//通知存入缓存GlobalManager.InCache(dto);}}return ret;}/*** 通过主键查询数据,主业务数据没找到会按历史切割数量找历史表** @param model* @param id* @param <T>* @return* @throws Exception*/public <T> T GetById(T model, Object id) throws Exception {JRT.DAL.ORM.Common.TableInfo tableInfo = JRT.DAL.ORM.Common.ModelToSqlUtil.GetTypeInfo(model);List<ParamDto> param = new ArrayList<>();ParamDto p = new ParamDto();p.Key = tableInfo.ID.Key;p.Value = id;param.add(p);//创建实体集合List<T> lstObj = FindAll(model, param, "", -1, -1, "", null, null);//结果为空,返回一个新建的对象if (lstObj.size() == 0) {//从历史表取数据String HisTableName = tableInfo.TableInfo.HisTableName();if (!HisTableName.isEmpty()) {int cutNum=0;//指定了切割列按切割列切割if(!tableInfo.TableInfo.CutHisColName().isEmpty()){cutNum=JRT.Core.Util.Convert.ToInt32(JRT.Core.Util.ReflectUtil.GetObjValue(model,tableInfo.TableInfo.CutHisColName()).toString());}else{cutNum=JRT.Core.Util.Convert.ToInt32(tableInfo.ID.Value.toString());}//除以历史页大小算到数据该放入哪个历史表int hisNum = cutNum/HistoryPage;//分割所有历史实体String[] HisTableNameArr = HisTableName.split("^");//存放页小于所有历史表数据就做移动if (hisNum < HisTableNameArr.length) {String HisModelName = HisTableNameArr[hisNum];//得到历史表的实体Class cHis = GetTypeByName(HisModelName);//克隆得到历史表的对象Object hisData = JRT.Core.Util.JsonUtil.CloneObject(model, cHis);//创建实体集合List<T> lstHisObj = DolerFindAll(0,hisData, param, "", -1, -1, "", null, null);//结果为空,返回一个新建的对象if (lstHisObj.size() > 0) {return lstHisObj.get(0);}}}return null;}//否则返回第一个实体else {return lstObj.get(0);}}/*** 把数据安装维护的历史表大小移入历史表** @param model 实体数据* @param <T> 泛型* @return 是否成功* @throws Exception*/public <T> boolean MoveToHistory(T model) throws Exception {JRT.DAL.ORM.Common.TableInfo tableInfo = JRT.DAL.ORM.Common.ModelToSqlUtil.GetTypeInfo(model);String HisTableName = tableInfo.TableInfo.HisTableName();if (!HisTableName.isEmpty()) {//分割所有历史实体String[] HisTableNameArr = HisTableName.split("^");int cutNum=0;//指定了切割列按切割列切割if(!tableInfo.TableInfo.CutHisColName().isEmpty()){cutNum=JRT.Core.Util.Convert.ToInt32(JRT.Core.Util.ReflectUtil.GetObjValue(model,tableInfo.TableInfo.CutHisColName()).toString());}else{cutNum=JRT.Core.Util.Convert.ToInt32(tableInfo.ID.Value.toString());}//除以历史页大小算到数据该放入哪个历史表int hisNum = cutNum/HistoryPage;//存放页小于所有历史表数据就做移动if (hisNum < HisTableNameArr.length) {String HisModelName = HisTableNameArr[hisNum];//得到历史表的实体Class cHis = GetTypeByName(HisModelName);//克隆得到历史表的对象Object newData = JRT.Core.Util.JsonUtil.CloneObject(model, cHis);OutParam out = new OutParam();//保存历史数据int saveRet = Save(newData, out);if (saveRet == 1) {saveRet = Remove(model, out);}if (saveRet == 1) {return true;}}}return false;}
连接管理类调整,根据是否带事务在操作执行成功后把数据推入ECP队列供Global管理器往主服务推送分发
package JRT.DAL.ORM.DBUtility;import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Savepoint;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;import JRT.DAL.ORM.DBUtility.C3P0Util;
import com.mchange.v2.c3p0.ComboPooledDataSource;
import JRT.DAL.ORM.DBUtility.IDbFactory;
import JRT.DAL.ORM.Global.ECPDto;/*** 连接和事务管理*/
public class DBManager {/*** 驱动名称*/private String factoryName="";/*** 当前对象的驱动*/private IDbFactory factory=null;/*** 存数据库连接对象*/private Connection connection=null;/*** 要转入缓存的临时数据*/private List<ECPDto> ToEcpTmp=null;/*** 尝试把数据推入缓存队列* @param obj 对象* @param Oper 操作 A:添加 U:更新 D:删除* @throws Exception*/public void TryPushToCache(Object obj,String Oper) throws Exception{ECPDto dto=new ECPDto();dto.Cmd=Oper;dto.Model=obj.getClass().getSimpleName();dto.Data=JRT.Core.Util.JsonUtil.Object2Json(obj);//有事务就推缓存if(Hastransaction=true){if(ToEcpTmp==null){ToEcpTmp=new ArrayList<>();}ToEcpTmp.add(dto);}else{//没事务的直接推入缓存队列JRT.DAL.ORM.Global.ECPManager.InECPToQuen(dto);}}/*** 为每个数据库驱动存储工厂*/private static ConcurrentHashMap<String, IDbFactory> hsFact = new ConcurrentHashMap<>();/*** 为每个数据库驱动存储连接池*/private static ConcurrentHashMap<String, ComboPooledDataSource> hsPoll = new ConcurrentHashMap<>();/*** 得到驱动对象* @param factoryName* @return*/public IDbFactory GetIDbFactory(String factoryName){if(factory==null){factory=hsFact.get(factoryName);}return factory;}/*** 尝试初始化连接池* @param factoryName*/public static void TryInitConnPool(String factoryName) throws Exception{if(factoryName==""){factoryName="LisMianDbFactory";}if(!hsPoll.containsKey(factoryName)){IDbFactory factory=JRT.Core.Context.ObjectContainer.GetTypeObject(factoryName);hsPoll.put(factoryName,C3P0Util.GetConnPool(factory));if(!hsFact.containsKey(factoryName)){hsFact.put(factoryName,factory);}}}/*** 构造函数* @param factName 驱动配置名称* @throws Exception*/public DBManager(String factName) throws Exception{factoryName=factName;TryInitConnPool(factoryName);}/*** 存数据库连接对象*/public Connection Connection() throws Exception{if(connection==null){connection=hsPoll.get(factoryName).getConnection();}return connection;}/*** 标识是否开启事务*/public boolean Hastransaction = false;/*** 存储开启多次事务的保存点,每次调用BeginTransaction开启事务是自动创建保存点*/public LinkedList<Savepoint> Transpoints = new LinkedList<Savepoint>();/*** 获取开启的事务层级* @return*/public int GetTransactionLevel(){return this.Transpoints.size();}/*** 释放数据库连接* @return true成功释放,false释放失败*/public boolean Close() throws Exception{if(connection!=null){connection.setAutoCommit(true);connection.close();}connection=null;return true;}/*** 此方法开启事务* @return true开启事务成功,false开始事务失败*/public boolean BeginTransaction() throws Exception{try{this.Connection().setAutoCommit(false);this.Hastransaction = true;Savepoint savepoint = this.Connection().setSavepoint();Transpoints.addLast(savepoint);return true;}catch (SQLException sqle){JRT.Core.Util.LogUtils.WriteExceptionLog("开启事务失败!" + sqle.getMessage(), sqle);}return false;}/*** 回滚上一层事务* @return true回滚事务成功,false回滚事务失败*/public boolean RollTransaction() throws Exception{//删除临时数据if(ToEcpTmp!=null) {ToEcpTmp.clear();ToEcpTmp=null;}//未开启事务时,算回滚事务成功if (!this.Hastransaction){return true;}try{if (this.Transpoints.size() == 0){this.Connection().rollback();this.Hastransaction = false;}else{Savepoint point = this.Transpoints.poll();this.Connection().rollback(point);}return true;}catch (SQLException sqle){JRT.Core.Util.LogUtils.WriteExceptionLog("事务回滚失败!" + sqle.getMessage(),sqle);throw sqle;}finally{if (!this.Hastransaction){Close();}}}/*** 回滚开启的全部事务* @return true回滚事务成功,false回滚事务失败*/public boolean RollTransactionAll() throws Exception{//删除临时数据if(ToEcpTmp!=null) {ToEcpTmp.clear();ToEcpTmp=null;}//未开启事务时,算回滚事务成功if (!this.Hastransaction){return true;}try{this.Connection().rollback();this.Hastransaction = false;return true;}catch (SQLException sqle){JRT.Core.Util.LogUtils.WriteExceptionLog("回滚所有事务层级失败!" + sqle.getMessage(),sqle);throw sqle;}finally{Close();}}/*** 提交事务* @return true提交事务成功,false提交事务失败*/public boolean CommitTransaction() throws Exception{try{//临时数据推入缓存if(ToEcpTmp!=null){for(ECPDto obj:ToEcpTmp){//没事务的直接推入缓存队列JRT.DAL.ORM.Global.ECPManager.InECPToQuen(obj);}}this.Connection().commit();this.Hastransaction = false;return true;}catch (SQLException sqle){JRT.Core.Util.LogUtils.WriteExceptionLog("提交事务失败!" + sqle.getMessage(),sqle);}finally{//提交事务,不论成功与否,释放数据库连接try{Close();}catch (Exception ex){}}return false;}}
增加的配置
这样就有一个理论上比较靠谱的缓存机制了,业务用SQL查到主列表数据后,调用DolerGet的获得各种周边相关数据来组装给前台返回,就不用每个写业务的人自己考虑写复杂的级联查询数据库受不了,自己缓存数据时候缓存是否可靠,自己不缓存数据时候调用太多数据库交互又慢的问题。DolerGet基本可以满足比较无脑的多维取数据组装的要求。
这篇关于JRT实现缓存协议的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!