AOP在PowerJob中的使用,缓存锁保证并发安全,知识细节全总结

2023-10-07 08:59

本文主要是介绍AOP在PowerJob中的使用,缓存锁保证并发安全,知识细节全总结,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

这是一篇简简单单的文章,需要你简简单单看一眼就好,如果有不明白的地方,欢迎留言讨论。

 

在之前的文章中出现过一次AOP的使用,就是在运行任务之前,需要判断一下,触发该任务执行的server,是不是数据库中对应任务所在app的直接server,使用的是注解@DesignateServer,本篇文章是从另一个注解,再一次顺一遍AOP的使用,而且本篇文章的注解,再一次用到了可重入锁ReentrantLock,这个也是之前的文章中说的内容,可以再熟悉一遍,本篇文章的入口就是注解——UseCacheLock。

从名字来看,该注解是一个使用缓存时的一个锁,该类位于tech.powerjob.server.core.lock包下,用来修饰方法,在运行时执行的,源码如下:

@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
public @interface UseCacheLock {String type();String key();int concurrencyLevel();
}

type:从使用的代码处得出,目前只有两种一种是processJobInstance,另一种是processWfInstance

key:主要是任务id或者任务实例id,还有工作流id。其中任务id或者任务实例id的选取,是通过一个表达式来判断得出的。

concurrencyLevel:缓存要用到的字段,允许同时并发执行的写操作数。

UseCacheLock 的使用场景

该注解在powerjob中一共使用了8次,其中2次出现在任务的派发,6次出现在工作流的操作中,这一次就选在任务的派发,来讲一下该注解的使用场景。

@UseCacheLock(type = "processJobInstance", key = "#jobInfo.getMaxInstanceNum() > 0 || T(tech.powerjob.common.enums.TimeExpressionType).FREQUENT_TYPES.contains(#jobInfo.getTimeExpressionType()) ? #jobInfo.getId() : #instanceId", concurrencyLevel = 1024)
public void dispatch(JobInfoDO jobInfo, Long instanceId) {... ...
}

方法内部的代码不重要,主要是来看方法上面的注解,里面的三个关键字分别是

processJobInstance

#jobInfo.getMaxInstanceNum() > 0 || T(tech.powerjob.common.enums.TimeExpressionType).FREQUENT_TYPES.contains(#jobInfo.getTimeExpressionType()) ? #jobInfo.getId() : #instanceId

1024

第一个和最后一个没什么好说的,主要说一说中间那一条长长的表达式,该表达式通过解读,就是判断最大同时运行任务数是否大于0,以及任务的时间表达式类型是不是FIX_RATE或者FIX_DELAY。这一表达式可以说,除非人为的将MaxInstanceNum设置为0,否则该条数据默认值就是1,也就是说这个表达式,不负责任的说,99.99%都是真,也就是说都会使用JobId作为key值。

按照代码来看,就是当任务在派发的时候,会使用到该注解,为的是防止该方法同时运行派发同一个任务,如果是同时派发两个不同的任务,就不会有影响,毕竟在派发的过程中涉及到了对任务实例的数据修改,如果两个同时进行,确实会产生问题。

UseCacheLock 的AOP处理

处理该注解的类个该类在同一个包,处理的源代码如下所示:

@Around(value = "@annotation(useCacheLock)")public Object execute(ProceedingJoinPoint point, UseCacheLock useCacheLock) throws Throwable {Cache<String, ReentrantLock> lockCache = lockContainer.computeIfAbsent(useCacheLock.type(), ignore -> {int concurrencyLevel = useCacheLock.concurrencyLevel();log.info("[UseSegmentLockAspect] create Lock Cache for [{}] with concurrencyLevel: {}", useCacheLock.type(), concurrencyLevel);return CacheBuilder.newBuilder().initialCapacity(300000).maximumSize(500000).concurrencyLevel(concurrencyLevel).expireAfterWrite(30, TimeUnit.MINUTES).build();});final Method method = AOPUtils.parseMethod(point);Long key = AOPUtils.parseSpEl(method, point.getArgs(), useCacheLock.key(), Long.class, 1L);final ReentrantLock reentrantLock = lockCache.get(String.valueOf(key), ReentrantLock::new);long start = System.currentTimeMillis();reentrantLock.lockInterruptibly();try {long timeCost = System.currentTimeMillis() - start;if (timeCost > SLOW_THRESHOLD) {final SlowLockEvent slowLockEvent = new SlowLockEvent().setType(SlowLockEvent.Type.LOCAL).setLockType(useCacheLock.type()).setLockKey(String.valueOf(key)).setCallerService(method.getDeclaringClass().getSimpleName()).setCallerMethod(method.getName()).setCost(timeCost);monitorService.monitor(slowLockEvent);log.warn("[UseSegmentLockAspect] wait lock for method({}#{}) cost {} ms! key = '{}', args = {}, ", method.getDeclaringClass().getSimpleName(), method.getName(), timeCost,key,JSON.toJSONString(point.getArgs()));}return point.proceed();} finally {reentrantLock.unlock();}
}

代码看着挺长的,但是内容其实没有多少,可以一步一步拆开来看。

缓存的创建

第一步通过type来获取缓存,从文章开头我们知道,这个type就两个类型,processJobInstance就是用来派发任务的,processWfInstance就是用来操作工作流任务的,该代码里面就是processJobInstance,如果缓存存在,直接拿来用,如果不存在,则创建缓存,来看一眼创建缓存的代码:

CacheBuilder.newBuilder().initialCapacity(300000).maximumSize(500000).concurrencyLevel(concurrencyLevel).expireAfterWrite(30, TimeUnit.MINUTES).build();

这个代码的大意就是创建一个有如下属性的缓存,缓存有效时间是30分钟(expireAfterWrite(30, TimeUnit.MINUTES)),这就像鱼有7秒记忆一样,这个缓存只能记录30分钟,过期失效。缓存的最大条目数是50万(maximumSize(500000))。指定用于缓存的hash table最低总规模是300000,允许同时并发操作数是concurrencyLevel,也就是传进来的1024.

key值的获取

第二步就是获取key值,该值主要是为了获取可重入锁用的,获取该值的源代码如下所示:

final Method method = AOPUtils.parseMethod(point);
Long key = AOPUtils.parseSpEl(method, point.getArgs(), useCacheLock.key(), Long.class, 1L);

从这个代码可以看到,用到了AOPUtil这个工具类的两个方法,第一个方法是解析出当前的方法,第二个是获取key值,这个AOPUtil在tech.powerjob.server.common.utils包下。解析方法的源码如下,备注解释各代码的目的:

public static Method parseMethod(ProceedingJoinPoint joinPoint) {//获取接入点的签名,此处必须是方法的签名,否则会报异常Signature pointSignature = joinPoint.getSignature();if (!(pointSignature instanceof  MethodSignature)) {throw new IllegalArgumentException("this annotation should be used on a method!");}//强转成方法的签名MethodSignature signature = (MethodSignature) pointSignature;//获取方法Method method = signature.getMethod();//如果方法所处的类是一个interfaceif (method.getDeclaringClass().isInterface()) {try {//通过IoC容器获取目标对象,然后再获取对象的方法method = joinPoint.getTarget().getClass().getDeclaredMethod(pointSignature.getName(), method.getParameterTypes());} catch (SecurityException | NoSuchMethodException e) {ExceptionUtils.rethrow(e);}}return method;
}

获取到了方法之后,就是获取key值,源代码如下,备注解释各代码的目的:

public static <T> T parseSpEl(Method method, Object[] arguments, String spEl, Class<T> clazz, T defaultResult) {//获取到方法的参数值类型String[] params = discoverer.getParameterNames(method);assert params != null;//创建数据上下文EvaluationContext context = new StandardEvaluationContext();for (int len = 0; len < params.length; len++) {//将param[len] = arguments[len]context.setVariable(params[len], arguments[len]);}try {//执行表达式,也就是前面#jobInfo.getMaxInstanceNum() > 0 || T(tech.powerjob.common.enums.TimeExpressionType).FREQUENT_TYPES.contains(#jobInfo.getTimeExpressionType()) ? #jobInfo.getId() : #instanceIdExpression expression = parser.parseExpression(spEl);//返回表达式执行的结果,以clazz设置的类型返回return expression.getValue(context, clazz);} catch (Exception e) {log.error("[AOPUtils] parse SpEL failed for method[{}], please concat @tjq to fix the bug!", method.getName(), e);return defaultResult;}
}

经过以上两步,key值就获取完毕了

加锁

加锁的源代码如下所示,就是如果缓存里面保存了锁,就直接拿到,如果没有,就new一个出来,然后就启动锁,

那两条时间主要是记录加锁的时间,如果时间过长就要记录一条日志,记录加锁慢时的任务信息。

final ReentrantLock reentrantLock = lockCache.get(String.valueOf(key), ReentrantLock::new);
long start = System.currentTimeMillis();
reentrantLock.lockInterruptibly();
try {long timeCost = System.currentTimeMillis() - start;... ...
}
... ...

加锁结束之后,就可以执行注解修饰的方法了,执行就是下面这一行:

point.proceed();

执行结束之后,将锁打开就OK了。

总结

 

本篇文章涉及的知识主要是AOP的使用,可重入锁的使用,IoC容器相关,Spring的表达式的使用,缓存Cache的创建,每一个知识点都够我喝一壶了,所以大家如果想要了解这些知识的细节,请自行搜索去查想要了解的内容,如果你懒得查,也可以问我,当然我也懒,回不回答就看我心情了,哼,我外号就叫不高兴,所以大家看着办吧。

这篇关于AOP在PowerJob中的使用,缓存锁保证并发安全,知识细节全总结的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/157341

相关文章

HarmonyOS学习(七)——UI(五)常用布局总结

自适应布局 1.1、线性布局(LinearLayout) 通过线性容器Row和Column实现线性布局。Column容器内的子组件按照垂直方向排列,Row组件中的子组件按照水平方向排列。 属性说明space通过space参数设置主轴上子组件的间距,达到各子组件在排列上的等间距效果alignItems设置子组件在交叉轴上的对齐方式,且在各类尺寸屏幕上表现一致,其中交叉轴为垂直时,取值为Vert

Java架构师知识体认识

源码分析 常用设计模式 Proxy代理模式Factory工厂模式Singleton单例模式Delegate委派模式Strategy策略模式Prototype原型模式Template模板模式 Spring5 beans 接口实例化代理Bean操作 Context Ioc容器设计原理及高级特性Aop设计原理Factorybean与Beanfactory Transaction 声明式事物

中文分词jieba库的使用与实景应用(一)

知识星球:https://articles.zsxq.com/id_fxvgc803qmr2.html 目录 一.定义: 精确模式(默认模式): 全模式: 搜索引擎模式: paddle 模式(基于深度学习的分词模式): 二 自定义词典 三.文本解析   调整词出现的频率 四. 关键词提取 A. 基于TF-IDF算法的关键词提取 B. 基于TextRank算法的关键词提取

使用SecondaryNameNode恢复NameNode的数据

1)需求: NameNode进程挂了并且存储的数据也丢失了,如何恢复NameNode 此种方式恢复的数据可能存在小部分数据的丢失。 2)故障模拟 (1)kill -9 NameNode进程 [lytfly@hadoop102 current]$ kill -9 19886 (2)删除NameNode存储的数据(/opt/module/hadoop-3.1.4/data/tmp/dfs/na

Hadoop数据压缩使用介绍

一、压缩原则 (1)运算密集型的Job,少用压缩 (2)IO密集型的Job,多用压缩 二、压缩算法比较 三、压缩位置选择 四、压缩参数配置 1)为了支持多种压缩/解压缩算法,Hadoop引入了编码/解码器 2)要在Hadoop中启用压缩,可以配置如下参数

Makefile简明使用教程

文章目录 规则makefile文件的基本语法:加在命令前的特殊符号:.PHONY伪目标: Makefilev1 直观写法v2 加上中间过程v3 伪目标v4 变量 make 选项-f-n-C Make 是一种流行的构建工具,常用于将源代码转换成可执行文件或者其他形式的输出文件(如库文件、文档等)。Make 可以自动化地执行编译、链接等一系列操作。 规则 makefile文件

学习hash总结

2014/1/29/   最近刚开始学hash,名字很陌生,但是hash的思想却很熟悉,以前早就做过此类的题,但是不知道这就是hash思想而已,说白了hash就是一个映射,往往灵活利用数组的下标来实现算法,hash的作用:1、判重;2、统计次数;

使用opencv优化图片(画面变清晰)

文章目录 需求影响照片清晰度的因素 实现降噪测试代码 锐化空间锐化Unsharp Masking频率域锐化对比测试 对比度增强常用算法对比测试 需求 对图像进行优化,使其看起来更清晰,同时保持尺寸不变,通常涉及到图像处理技术如锐化、降噪、对比度增强等 影响照片清晰度的因素 影响照片清晰度的因素有很多,主要可以从以下几个方面来分析 1. 拍摄设备 相机传感器:相机传

sqlite3 相关知识

WAL 模式 VS 回滚模式 特性WAL 模式回滚模式(Rollback Journal)定义使用写前日志来记录变更。使用回滚日志来记录事务的所有修改。特点更高的并发性和性能;支持多读者和单写者。支持安全的事务回滚,但并发性较低。性能写入性能更好,尤其是读多写少的场景。写操作会造成较大的性能开销,尤其是在事务开始时。写入流程数据首先写入 WAL 文件,然后才从 WAL 刷新到主数据库。数据在开始

pdfmake生成pdf的使用

实际项目中有时会有根据填写的表单数据或者其他格式的数据,将数据自动填充到pdf文件中根据固定模板生成pdf文件的需求 文章目录 利用pdfmake生成pdf文件1.下载安装pdfmake第三方包2.封装生成pdf文件的共用配置3.生成pdf文件的文件模板内容4.调用方法生成pdf 利用pdfmake生成pdf文件 1.下载安装pdfmake第三方包 npm i pdfma