xxl-job源码分析之--执行器(客户端)

2024-03-03 08:04

本文主要是介绍xxl-job源码分析之--执行器(客户端),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

xxl-job执行器(客户端),源码分析:

一、调用流程

  • XxlJobSpringExecutor构造器
  • XxlJobSpringExecutor.initJobHandlerMethodRepository() 通过ApplicationContext上下文获取xxljob注解,并将该注解封装IJobHandler对象保存在jobHandlerRepository中(一个map)
  • XxlJobSpringExecutor父类XxlJobExecutor.start()
  • 在start()方法中核心initEmbedServer方法,用于初始化执行器服务器,用于客户端
  • XxlJobExecutor.initEmbedServer调用EmbedServer.start()
  • EmbedServer.start作用使用netty 监听客户端本机端口;作用是接受服务器端下达执行的指令,向服务器(admin)进行注册

二、源码分析

1、XxlJobSpringExecutor声明及入口

一般我们使用springboot 集成xxljob需要手动声明XxlJobSpringExecutor,项目启动时会进入xxlJobExecutor()

@Configuration
public class XxlJobConfig {private Logger logger = LoggerFactory.getLogger(XxlJobConfig.class);@Value("${xxl.job.admin.addresses}")private String adminAddresses;@Value("${xxl.job.accessToken}")private String accessToken;@Value("${xxl.job.executor.appname}")private String appname;@Value("${xxl.job.executor.address}")private String address;@Value("${xxl.job.executor.ip}")private String ip;@Value("${xxl.job.executor.port}")private int port;@Value("${xxl.job.executor.logpath}")private String logPath;@Value("${xxl.job.executor.logretentiondays}")private int logRetentionDays;@Beanpublic XxlJobSpringExecutor xxlJobExecutor() {logger.info(">>>>>>>>>>> xxl-job config init.");XxlJobSpringExecutor xxlJobSpringExecutor = new XxlJobSpringExecutor();xxlJobSpringExecutor.setAdminAddresses(adminAddresses);xxlJobSpringExecutor.setAppname(appname);xxlJobSpringExecutor.setAddress(address);xxlJobSpringExecutor.setIp(ip);xxlJobSpringExecutor.setPort(port);xxlJobSpringExecutor.setAccessToken(accessToken);xxlJobSpringExecutor.setLogPath(logPath);xxlJobSpringExecutor.setLogRetentionDays(logRetentionDays);return xxlJobSpringExecutor;}

2、XxlJobSpringExecutor构造器

源码如下

public class XxlJobSpringExecutor extends XxlJobExecutor implements ApplicationContextAware, SmartInitializingSingleton, DisposableBean {private static final Logger logger = LoggerFactory.getLogger(XxlJobSpringExecutor.class);// start@Overridepublic void afterSingletonsInstantiated() {/*initJobHandlerRepository(applicationContext);*/// init JobHandler Repository (for method)// 初始化xxljob注解initJobHandlerMethodRepository(applicationContext);// refresh GlueFactoryGlueFactory.refreshInstance(1);// super starttry {//调用父类startsuper.start();} catch (Exception e) {throw new RuntimeException(e);}}

XxlJobSpringExecutor继承XxlJobExecutor并实现了ApplicationContextAware, SmartInitializingSingleton, DisposableBean。

afterSingletonsInstantiated()实现了是SmartInitializingSingleton方法。是 Spring Framework中的一个生命周期回调方法,它在所有单例 Bean 实例化和初始化之后调用。

在这个方法中重点initJobHandlerMethodRepository()和start()

3、initJobHandlerMethodRepository()

private void initJobHandlerMethodRepository(ApplicationContext applicationContext) {if (applicationContext == null) {return;}// init job handler from methodString[] beanDefinitionNames = applicationContext.getBeanNamesForType(Object.class, false, true);for (String beanDefinitionName : beanDefinitionNames) {Object bean = applicationContext.getBean(beanDefinitionName);Map<Method, XxlJob> annotatedMethods = null;   // referred to :org.springframework.context.event.EventListenerMethodProcessor.processBeantry {annotatedMethods = MethodIntrospector.selectMethods(bean.getClass(),new MethodIntrospector.MetadataLookup<XxlJob>() {@Overridepublic XxlJob inspect(Method method) {return AnnotatedElementUtils.findMergedAnnotation(method, XxlJob.class);}});} catch (Throwable ex) {logger.error("xxl-job method-jobhandler resolve error for bean[" + beanDefinitionName + "].", ex);}if (annotatedMethods==null || annotatedMethods.isEmpty()) {continue;}for (Map.Entry<Method, XxlJob> methodXxlJobEntry : annotatedMethods.entrySet()) {Method executeMethod = methodXxlJobEntry.getKey();XxlJob xxlJob = methodXxlJobEntry.getValue();if (xxlJob == null) {continue;}String name = xxlJob.value();if (name.trim().length() == 0) {throw new RuntimeException("xxl-job method-jobhandler name invalid, for[" + bean.getClass() + "#" + executeMethod.getName() + "] .");}if (loadJobHandler(name) != null) {throw new RuntimeException("xxl-job jobhandler[" + name + "] naming conflicts.");}// execute method/*if (!(method.getParameterTypes().length == 1 && method.getParameterTypes()[0].isAssignableFrom(String.class))) {throw new RuntimeException("xxl-job method-jobhandler param-classtype invalid, for[" + bean.getClass() + "#" + method.getName() + "] , " +"The correct method format like \" public ReturnT<String> execute(String param) \" .");}if (!method.getReturnType().isAssignableFrom(ReturnT.class)) {throw new RuntimeException("xxl-job method-jobhandler return-classtype invalid, for[" + bean.getClass() + "#" + method.getName() + "] , " +"The correct method format like \" public ReturnT<String> execute(String param) \" .");}*/executeMethod.setAccessible(true);// init and destoryMethod initMethod = null;Method destroyMethod = null;if (xxlJob.init().trim().length() > 0) {try {initMethod = bean.getClass().getDeclaredMethod(xxlJob.init());initMethod.setAccessible(true);} catch (NoSuchMethodException e) {throw new RuntimeException("xxl-job method-jobhandler initMethod invalid, for[" + bean.getClass() + "#" + executeMethod.getName() + "] .");}}if (xxlJob.destroy().trim().length() > 0) {try {destroyMethod = bean.getClass().getDeclaredMethod(xxlJob.destroy());destroyMethod.setAccessible(true);} catch (NoSuchMethodException e) {throw new RuntimeException("xxl-job method-jobhandler destroyMethod invalid, for[" + bean.getClass() + "#" + executeMethod.getName() + "] .");}}// registry jobhandlerregistJobHandler(name, new MethodJobHandler(bean, executeMethod, initMethod, destroyMethod));}}}

该方法作用是:根据参数ApplicationContext获取xxljob注解,通过反射,获取类及方法,并将保存到本地map中。

其中registJobHandler(name, new MethodJobHandler(bean, executeMethod, initMethod, destroyMethod));保存本地map中

    public static IJobHandler registJobHandler(String name, IJobHandler jobHandler){logger.info(">>>>>>>>>>> xxl-job register jobhandler success, name:{}, jobHandler:{}", name, jobHandler);return jobHandlerRepository.put(name, jobHandler);}

4、调用父类XxlJobExecutor.start() 这里是重点

public class XxlJobExecutor  {private static final Logger logger = LoggerFactory.getLogger(XxlJobExecutor.class);// ---------------------- param ----------------------private String adminAddresses;private String accessToken;private String appname;private String address;private String ip;private int port;private String logPath;private int logRetentionDays;........// ---------------------- start + stop ----------------------public void start() throws Exception {// init logpathXxlJobFileAppender.initLogPath(logPath);// init invoker, admin-clientinitAdminBizList(adminAddresses, accessToken);// init JobLogFileCleanThreadJobLogFileCleanThread.getInstance().start(logRetentionDays);// init TriggerCallbackThreadTriggerCallbackThread.getInstance().start();// init executor-server//初始化执行服务器。用户客户端 zhangminginitEmbedServer(address, ip, port, appname, accessToken);}

在这里我们重点看initEmbedServer()

    private void initEmbedServer(String address, String ip, int port, String appname, String accessToken) throws Exception {// fill ip portport = port>0?port: NetUtil.findAvailablePort(9999);ip = (ip!=null&&ip.trim().length()>0)?ip: IpUtil.getIp();// generate addressif (address==null || address.trim().length()==0) {String ip_port_address = IpUtil.getIpPort(ip, port);   // registry-address:default use address to registry , otherwise use ip:port if address is nulladdress = "http://{ip_port}/".replace("{ip_port}", ip_port_address);}// accessTokenif (accessToken==null || accessToken.trim().length()==0) {logger.warn(">>>>>>>>>>> xxl-job accessToken is empty. To ensure system security, please set the accessToken.");}// startembedServer = new EmbedServer();embedServer.start(address, port, appname, accessToken);}

该方法前面是些赋值及判断,核心是后面2行

embedServer = new EmbedServer();
embedServer.start(address, port, appname, accessToken);

EmbedServer.start()

 public void start(final String address, final int port, final String appname, final String accessToken) {executorBiz = new ExecutorBizImpl();thread = new Thread(new Runnable() {@Overridepublic void run() {//使用netty 监听客户端本机端口;作用是接受服务器端下达执行的指令 zhangming// paramEventLoopGroup bossGroup = new NioEventLoopGroup();EventLoopGroup workerGroup = new NioEventLoopGroup();ThreadPoolExecutor bizThreadPool = new ThreadPoolExecutor(0,200,60L,TimeUnit.SECONDS,new LinkedBlockingQueue<Runnable>(2000),new ThreadFactory() {@Overridepublic Thread newThread(Runnable r) {return new Thread(r, "xxl-rpc, EmbedServer bizThreadPool-" + r.hashCode());}},new RejectedExecutionHandler() {@Overridepublic void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {throw new RuntimeException("xxl-job, EmbedServer bizThreadPool is EXHAUSTED!");}});try {// start serverServerBootstrap bootstrap = new ServerBootstrap();bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {@Overridepublic void initChannel(SocketChannel channel) throws Exception {channel.pipeline().addLast(new IdleStateHandler(0, 0, 30 * 3, TimeUnit.SECONDS))  // beat 3N, close if idle.addLast(new HttpServerCodec()).addLast(new HttpObjectAggregator(5 * 1024 * 1024))  // merge request & reponse to FULL.addLast(new EmbedHttpServerHandler(executorBiz, accessToken, bizThreadPool));}}).childOption(ChannelOption.SO_KEEPALIVE, true);// bindChannelFuture future = bootstrap.bind(port).sync();logger.info(">>>>>>>>>>> xxl-job remoting server start success, nettype = {}, port = {}", EmbedServer.class, port);// start registry// 向服务端(admin)进行注册 zhangmingstartRegistry(appname, address);// wait util stopfuture.channel().closeFuture().sync();} catch (InterruptedException e) {if (e instanceof InterruptedException) {logger.info(">>>>>>>>>>> xxl-job remoting server stop.");} else {logger.error(">>>>>>>>>>> xxl-job remoting server error.", e);}} finally {// stoptry {workerGroup.shutdownGracefully();bossGroup.shutdownGracefully();} catch (Exception e) {logger.error(e.getMessage(), e);}}}});thread.setDaemon(true);	// daemon, service jvm, user thread leave >>> daemon leave >>> jvm leavethread.start();}

该防范描述:

  1. 启动一个守护线程
  2. 在守护线程中使用netty监听xxl-job端口(配置文件port),主要作用用来监听服务器端下达执行job指令的。该neety接受消息和处理类为EmbedHttpServerHandler
  3. startRegistry(appname, address);向服务端进行注册,告知客户端还活着。
    public void startRegistry(final String appname, final String address) {// start registryExecutorRegistryThread.getInstance().start(appname, address);}

ExecutorRegistryThread.getInstance().start()使用http请求,每30s向服务端进行报道一次;
这里又开启一个线程,每隔30s不间断地while

 public void start(final String appname, final String address){// validif (appname==null || appname.trim().length()==0) {logger.warn(">>>>>>>>>>> xxl-job, executor registry config fail, appname is null.");return;}if (XxlJobExecutor.getAdminBizList() == null) {logger.warn(">>>>>>>>>>> xxl-job, executor registry config fail, adminAddresses is null.");return;}registryThread = new Thread(new Runnable() {@Overridepublic void run() {// 每30秒钟向服务器端进行注册(心跳)zhangming// registrywhile (!toStop) {try {RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appname, address);for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {try {ReturnT<String> registryResult = adminBiz.registry(registryParam);if (registryResult!=null && ReturnT.SUCCESS_CODE == registryResult.getCode()) {registryResult = ReturnT.SUCCESS;logger.debug(">>>>>>>>>>> xxl-job registry success, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});break;} else {logger.info(">>>>>>>>>>> xxl-job registry fail, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});}} catch (Exception e) {logger.info(">>>>>>>>>>> xxl-job registry error, registryParam:{}", registryParam, e);}}} catch (Exception e) {if (!toStop) {logger.error(e.getMessage(), e);}}try {if (!toStop) {TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);}} catch (InterruptedException e) {if (!toStop) {logger.warn(">>>>>>>>>>> xxl-job, executor registry thread interrupted, error msg:{}", e.getMessage());}}}// registry removetry {RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appname, address);for (AdminBiz adminBiz: XxlJobExecutor.getAdminBizList()) {try {ReturnT<String> registryResult = adminBiz.registryRemove(registryParam);if (registryResult!=null && ReturnT.SUCCESS_CODE == registryResult.getCode()) {registryResult = ReturnT.SUCCESS;logger.info(">>>>>>>>>>> xxl-job registry-remove success, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});break;} else {logger.info(">>>>>>>>>>> xxl-job registry-remove fail, registryParam:{}, registryResult:{}", new Object[]{registryParam, registryResult});}} catch (Exception e) {if (!toStop) {logger.info(">>>>>>>>>>> xxl-job registry-remove error, registryParam:{}", registryParam, e);}}}} catch (Exception e) {if (!toStop) {logger.error(e.getMessage(), e);}}logger.info(">>>>>>>>>>> xxl-job, executor registry thread destory.");}});registryThread.setDaemon(true);registryThread.setName("xxl-job, executor ExecutorRegistryThread");registryThread.start();}

三、总结

xxl-job执行器(客户端的源码还是比较简单的),大致步骤如下
当程序启动时,依次执行:

  1. 声明XxlJobSpringExecutor
  2. 在XxlJobSpringExecutor调用自身initJobHandlerMethodRepository方法,通过反射将xxljob的类、方法信息放在本地map中
  3. 在XxlJobSpringExecutor调用父类start方法,该方法重要调用自身的initEmbedServer()
  4. initEmbedServer()调用EmbedServer.start(),该start()作用:使用netty 监听本地端口,主要用来接受服务端(admin)下发执行指令;startRegistry()向服务端(admin)进行注册报道(使用http,默认间隔30s)

后续再写一篇服务端的源码解析

这篇关于xxl-job源码分析之--执行器(客户端)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/768994

相关文章

Java调用C++动态库超详细步骤讲解(附源码)

《Java调用C++动态库超详细步骤讲解(附源码)》C语言因其高效和接近硬件的特性,时常会被用在性能要求较高或者需要直接操作硬件的场合,:本文主要介绍Java调用C++动态库的相关资料,文中通过代... 目录一、直接调用C++库第一步:动态库生成(vs2017+qt5.12.10)第二步:Java调用C++

kotlin中const 和val的区别及使用场景分析

《kotlin中const和val的区别及使用场景分析》在Kotlin中,const和val都是用来声明常量的,但它们的使用场景和功能有所不同,下面给大家介绍kotlin中const和val的区别,... 目录kotlin中const 和val的区别1. val:2. const:二 代码示例1 Java

Go标准库常见错误分析和解决办法

《Go标准库常见错误分析和解决办法》Go语言的标准库为开发者提供了丰富且高效的工具,涵盖了从网络编程到文件操作等各个方面,然而,标准库虽好,使用不当却可能适得其反,正所谓工欲善其事,必先利其器,本文将... 目录1. 使用了错误的time.Duration2. time.After导致的内存泄漏3. jsO

Python实现无痛修改第三方库源码的方法详解

《Python实现无痛修改第三方库源码的方法详解》很多时候,我们下载的第三方库是不会有需求不满足的情况,但也有极少的情况,第三方库没有兼顾到需求,本文将介绍几个修改源码的操作,大家可以根据需求进行选择... 目录需求不符合模拟示例 1. 修改源文件2. 继承修改3. 猴子补丁4. 追踪局部变量需求不符合很

Spring事务中@Transactional注解不生效的原因分析与解决

《Spring事务中@Transactional注解不生效的原因分析与解决》在Spring框架中,@Transactional注解是管理数据库事务的核心方式,本文将深入分析事务自调用的底层原理,解释为... 目录1. 引言2. 事务自调用问题重现2.1 示例代码2.2 问题现象3. 为什么事务自调用会失效3

找不到Anaconda prompt终端的原因分析及解决方案

《找不到Anacondaprompt终端的原因分析及解决方案》因为anaconda还没有初始化,在安装anaconda的过程中,有一行是否要添加anaconda到菜单目录中,由于没有勾选,导致没有菜... 目录问题原因问http://www.chinasem.cn题解决安装了 Anaconda 却找不到 An

Spring定时任务只执行一次的原因分析与解决方案

《Spring定时任务只执行一次的原因分析与解决方案》在使用Spring的@Scheduled定时任务时,你是否遇到过任务只执行一次,后续不再触发的情况?这种情况可能由多种原因导致,如未启用调度、线程... 目录1. 问题背景2. Spring定时任务的基本用法3. 为什么定时任务只执行一次?3.1 未启用

C++ 各种map特点对比分析

《C++各种map特点对比分析》文章比较了C++中不同类型的map(如std::map,std::unordered_map,std::multimap,std::unordered_multima... 目录特点比较C++ 示例代码 ​​​​​​代码解释特点比较1. std::map底层实现:基于红黑

Spring、Spring Boot、Spring Cloud 的区别与联系分析

《Spring、SpringBoot、SpringCloud的区别与联系分析》Spring、SpringBoot和SpringCloud是Java开发中常用的框架,分别针对企业级应用开发、快速开... 目录1. Spring 框架2. Spring Boot3. Spring Cloud总结1. Sprin

Spring 中 BeanFactoryPostProcessor 的作用和示例源码分析

《Spring中BeanFactoryPostProcessor的作用和示例源码分析》Spring的BeanFactoryPostProcessor是容器初始化的扩展接口,允许在Bean实例化前... 目录一、概览1. 核心定位2. 核心功能详解3. 关键特性二、Spring 内置的 BeanFactory