SpringBatch从入门到实战(二):HelloWorld

2023-10-11 23:10

本文主要是介绍SpringBatch从入门到实战(二):HelloWorld,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

一:HelloWorld

1.1 配置Job、Step、Tasklet

在这里插入图片描述

@Configuration
public class HelloWorldJobConfig {@Autowiredprivate JobBuilderFactory jobBuilderFactory;@Autowiredprivate StepBuilderFactory stepBuilderFactory;@Beanpublic Job helloWorldJob() {return jobBuilderFactory.get("helloWorldJob").start(hellWorldStep()).build();}@Beanpublic Step hellWorldStep() {return stepBuilderFactory.get("hellWorldStep").tasklet(hellWorldTasklet()).build();}@Beanpublic Tasklet hellWorldTasklet() {return new Tasklet() {@Overridepublic RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {System.out.println("hello world spring batch");return RepeatStatus.FINISHED;}};}
}

RepeatStatus枚举有两个值:

  • FINISHED:表示完成,一般都是返回FINISHED。
  • CONTINUABLE:继续,如果返回该值,会死循环的执行execute()方法。

1.2 手动触发作业Job

@RestController
@RequestMapping("/job")
public class JobController {@Autowiredprivate JobLauncher jobLauncher;@Autowired@Qualifier("helloWorldJob")private Job job;@RequestMapping("/start")public ExitStatus start() throws Exception {JobExecution jobExecution = jobLauncher.run(job, new JobParameters());return jobExecution.getExitStatus();}
}

在这里插入图片描述

在这里插入图片描述

1.3 基础概念

  • JobLauncher作业启动器,启动作业的入口。对应的实现类为SimpleJobLauncher。
  • Job作业,用于配置作业的相关配置,一个作业可以配置多个步骤,步骤之间是有序的。
  • Step步骤,作业具体执行的业务逻辑,一个Job可以配置多个Step。步骤有两种实现方式:
    • Tasklet方式:所有作业逻辑写在一个方法中。
    • Chunk方式:将一个完整的作业逻辑根据作用拆分到三个方法中
      • ItemReader:负责从数据源中读数据(如从文件、数据库等)。
      • ItemProcessor :负责对读出来的数据进行非法校验和对数据进行加工。
      • ItemWriter:将数据写到某个目标中(如文件、数据库等)。
  • JobBuilderFactory作业构建起工厂,用于构建作业Job对象。
    • get(String name):设置作业名称。
    • start(Step step):设置作业启动的第一个步骤。
    • build():构建Job对象。
  • StepBuilderFactory作业构建器工厂,用于构造步骤Step对象。
    • get(String name):设置步骤名称。
    • tasklet(Tasklet tasklet):设置Tasklet。
    • build():构建Step对象。
  • Tasklet:用来封装批处理具体的业务逻辑。
  • JobRepository:作业持久化,在执行作业的过程中用于操作spring batch相关的表,记录作业的相关状态等。

1.4 JobInstance作业实例和JobExecution作业执行

  • exitCode=COMPLETED, 表示一个执行实例,往BATCH_JOB_INSTANCE表中插入一条数据。
  • 每执行一次作业(无论exitCode是什么)都会往BATCH_JOB_EXECUTION表中插入一条数据。
  • 当exitCode=COMPLETED时,一个作业中配置了多少个步骤就往BATCH_STEP_EXECUTION表插入多少条数据。

在这里插入图片描述

在这里插入图片描述

在这里插入图片描述

1.5 第二次启动作业

在这里插入图片描述

注意:exitCode: COMPLETED,表示作业真正完成。一旦成功执行了这次作业,就不允许执行第二次,如果再执行会返回NOOP,表示无效的作业。 如果想重复执行可以改一下jobName或者改一下jobParameters。

每执行一次(无论exitCode是什么值)就会往BATCH_JOB_EXECUTION表插入一条记录,所以第二次执行作业也会插入一条数据,但是status=COMPLETEDexit_code=NOOP,exit_code表示此次的退出状态,status表示整个作业状态。

在这里插入图片描述

1.6 Job、JobInstance、JobExecution关系

  • 同一个job_name在BATCH_JOB_INSTANCE表中有多少条数据取决于 (job_name + job_key) 作为联合主键,job_key又依赖于JobParameters,同一个Job可以有不同的JobParameters,所以Job和JobInstance是多对多的关系。
  • 每执行一次作业就会生成一条JobExecution(无论exitCode是什么),当作业执行成功时有1条JobExecution,当执行失败时可以重新执行,JobExecution会大于1条,所以JobInstance和JobExecution是多对多的关系。

在这里插入图片描述

二:多步骤

一个Job可以配置多个Step,多个步骤按照定义的先后顺序执行:

  • start(Step):定义第一个步骤。
  • next(Step):定义其它后续步骤。
@Bean
public Job helloWorldJob() {return jobBuilderFactory.get("helloWorldJob")// 第一个步骤.start(hellWorldStep())// 其它步骤.next(hellWorldStep2()).build();
}@Bean
public Step hellWorldStep() {return stepBuilderFactory.get("hellWorldStep").tasklet(hellWorldTasklet()).build();
}@Bean
public Tasklet hellWorldTasklet() {return new Tasklet() {@Overridepublic RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {System.out.println(" hello world spring batch");return RepeatStatus.FINISHED;}};
}@Bean
public Step hellWorldStep2() {return stepBuilderFactory.get("hellWorldStep2").tasklet(hellWorldTasklet2()).build();
}@Bean
public Tasklet hellWorldTasklet2() {return new Tasklet() {@Overridepublic RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {JobExecution jobExecution = stepContribution.getStepExecution().getJobExecution();// STARTEDSystem.out.println(jobExecution.getStatus());return RepeatStatus.FINISHED;}};
}

在这里插入图片描述

三:流式步骤 Flow

流式步骤是一系列有序子步骤的集合,将一个大的步骤拆分成多个有序子步骤。执行顺序:

  1. Step1: tasklet1
  2. Step2: Flow
    • 2.1 Step21:tasklet21
    • 2.2 Step22:tasklet22
    • 2.3 Step23:tasklet23
  3. Step3: tasklet3
@Configuration
public class HelloWorldFlowJobConfig {@Autowiredprivate JobBuilderFactory jobBuilderFactory;@Autowiredprivate StepBuilderFactory stepBuilderFactory;@Beanpublic Job flowStepJob() {return jobBuilderFactory.get("flowStepJob").start(step1()).next(step2()).next(step3()).build();}@Beanpublic Step step1() {return stepBuilderFactory.get("step1").tasklet(tasklet1()).build();}@Beanpublic Step step2() {// 将Flow包装成Stepreturn stepBuilderFactory.get("step2").flow(flow2()).build();}@Beanpublic Flow flow2() {return new FlowBuilder<Flow>("flow2").start(step21()).next(step22()).next(step23()).build();}@Beanpublic Step step21() {return stepBuilderFactory.get("step21").tasklet(tasklet21()).build();}@Beanpublic Step step22() {return stepBuilderFactory.get("step22").tasklet(tasklet22()).build();}@Beanpublic Step step23() {return stepBuilderFactory.get("step23").tasklet(tasklet23()).build();}@Beanpublic Step step3() {return stepBuilderFactory.get("step3").tasklet(tasklet3()).build();}@Beanpublic Tasklet tasklet1() {return new Tasklet() {@Overridepublic RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {System.out.println("tasklet1");return RepeatStatus.FINISHED;}};}@Beanpublic Tasklet tasklet21() {return new Tasklet() {@Overridepublic RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {System.out.println("tasklet21");return RepeatStatus.FINISHED;}};}@Beanpublic Tasklet tasklet22() {return new Tasklet() {@Overridepublic RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {System.out.println("tasklet22");return RepeatStatus.FINISHED;}};}@Beanpublic Tasklet tasklet23() {return new Tasklet() {@Overridepublic RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {System.out.println("tasklet23");return RepeatStatus.FINISHED;}};}@Beanpublic Tasklet tasklet3() {return new Tasklet() {@Overridepublic RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {System.out.println("tasklet3");return RepeatStatus.FINISHED;}};}
}

在这里插入图片描述

四:步骤并行执行

默认步骤之间是使用同一个线程串行执行的,先执行第一个步骤,第一个步骤执行完后再执行第二个步骤,第二个步骤执行完后再执行第三个步骤。通过 split(TaskExecutor executor) 可以将步骤放在不同的线程中并行执行。并行执行哪个步骤先执行是不确定的。

@Bean
public Job helloWorldJob() {return jobBuilderFactory.get("helloWorldJob").start(hellWorldStep()).incrementer(new RunIdIncrementer()).split(new SimpleAsyncTaskExecutor()).add(hellWorldFlow()).end().build();
}@Bean
public Step hellWorldStep() {return stepBuilderFactory.get("hellWorldStep").tasklet(hellWorldTasklet()).build();
}@Bean
public Step hellWorldStep2() {return stepBuilderFactory.get("hellWorldStep2").tasklet(hellWorldTasklet2()).build();
}@Bean
public Flow hellWorldFlow() {return new FlowBuilder<Flow>("hellWorldFlow").start(hellWorldStep2()).build();
}@Bean
public Tasklet hellWorldTasklet() {return new Tasklet() {@Overridepublic RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {System.out.println(Thread.currentThread().getName() + " hellWorldTasklet");return RepeatStatus.FINISHED;}};
}@Bean
public Tasklet hellWorldTasklet2() {return new Tasklet() {@Overridepublic RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {System.out.println(Thread.currentThread().getName() + " hellWorldTasklet2");return RepeatStatus.FINISHED;}};
}

并行执行,Step2比Step1先执行了。
在这里插入图片描述

五:作业执行监听器 JobExecutionListener

作业执行监听器用于作业开始前执行某些操作,作业结束时执行某些操作。

注意:afterJob无论批处理任务成功还是失败都会被执行,所以要增加状态判断。

方式一:实现接口 JobExecutionListener

public class HelloWorldJobListener implements JobExecutionListener {@Overridepublic void beforeJob(JobExecution jobExecution) {jobExecution.getExecutionContext().put("begin", System.currentTimeMillis());// Status=STARTEDSystem.err.println("beforeJob " + jobExecution.getStatus());}@Overridepublic void afterJob(JobExecution jobExecution) {long begin = jobExecution.getExecutionContext().getLong("begin");long total = System.currentTimeMillis() - begin;if(jobExecution.getStatus() == BatchStatus.COMPLETED) {} else if (jobExecution.getStatus() == BatchStatus.FAILED) {}// Status=COMPLETEDSystem.err.println("afterJob " + jobExecution.getStatus() + " total: "+ total);}
}

方式二:注解 @BeforeJob + @AfterJob

public class HelloWorldJobAnnationListener {@BeforeJobpublic void beforeJob(JobExecution jobExecution) {jobExecution.getExecutionContext().put("begin", System.currentTimeMillis());// Status=STARTEDSystem.err.println("beforeJob " + jobExecution.getStatus());}@AfterJobpublic void afterJob(JobExecution jobExecution) {long begin = jobExecution.getExecutionContext().getLong("begin");long total = System.currentTimeMillis() - begin;// Status=COMPLETEDSystem.err.println("afterJob " + jobExecution.getStatus() + " total: "+ total);}
}

@Bean
public Job helloWorldJob() {return jobBuilderFactory.get("helloWorldJob").start(hellWorldStep())// 配置监听器.listener(jobExecutionListener()).build();
}@Bean
public JobExecutionListener jobExecutionListener() {return new HelloWorldJobListener();
}

六:步骤执行监听器

可以通过实现StepExecutionListener接口实现,也可以通过注解方式实现(@BeforeStep、@AfterStep)。

public class HelloWorldStepListener implements StepExecutionListener {@Overridepublic void beforeStep(StepExecution stepExecution) {System.out.println("步骤监听器:步骤执行之前监听," + stepExecution.getStepName());}@Overridepublic ExitStatus afterStep(StepExecution stepExecution) {System.out.println("步骤监听器:步骤执行之前监听," + stepExecution.getStepName());return stepExecution.getExitStatus();}
}
@Bean
public Step helloWorldStep() {return stepBuilderFactory.get("helloWorldStep").tasklet(hellWorldTasklet())// 配置步骤监听器.listener(stepExecutionListener()).build();
}@Bean
public StepExecutionListener stepExecutionListener() {return new HelloWorldStepListener();
}

七:Job嵌套

Job之间也可以嵌套,比如一个父Job封装多个已经存在的子Job。

@Configuration
public class ChildrenJobConfig {@Autowiredprivate JobBuilderFactory jobBuilderFactory;@Autowiredprivate StepBuilderFactory stepBuilderFactory;@Beanpublic Job childJob1() {return jobBuilderFactory.get("childJob1").start(childJob1Step()).incrementer(new RunIdIncrementer()).build();}@Beanpublic Step childJob1Step() {return stepBuilderFactory.get("childJob1Step").tasklet(childJob1StepTasklet()).build();}@Beanpublic Tasklet childJob1StepTasklet() {return new Tasklet() {@Overridepublic RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {System.out.println(Thread.currentThread().getName() + " childJob1StepTasklet");return RepeatStatus.FINISHED;}};}@Beanpublic Job childJob2() {return jobBuilderFactory.get("childJob2").start(childJob2Step2()).incrementer(new RunIdIncrementer()).build();}@Beanpublic Step childJob2Step2() {return stepBuilderFactory.get("childJob2Step2").tasklet(childJob2Step2Tasklet2()).build();}@Beanpublic Tasklet childJob2Step2Tasklet2() {return new Tasklet() {@Overridepublic RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {System.out.println(Thread.currentThread().getName() + " childJob2Step2Tasklet2");return RepeatStatus.FINISHED;}};}
}
@Configuration
public class ParentJobConfig {@Autowiredprivate JobLauncher jobLauncher;@Autowiredprivate JobBuilderFactory jobBuilderFactory;@Autowiredprivate Job childJob1;@Autowiredprivate Job childJob2;@Beanpublic Job parentJob(JobRepository jobRepository, PlatformTransactionManager transactionManager) {return jobBuilderFactory.get("parentJob").start(parent1Step(jobRepository, transactionManager)).next(parent2Step(jobRepository, transactionManager)).incrementer(new RunIdIncrementer()).build();}@Beanpublic Step parent1Step(JobRepository jobRepository, PlatformTransactionManager transactionManager) {// Job转Step: 将子Job封装成父Stepreturn new JobStepBuilder(new StepBuilder("parent1Step")).job(childJob1).launcher(jobLauncher).repository(jobRepository).transactionManager(transactionManager).build();}@Beanpublic Step parent2Step(JobRepository jobRepository, PlatformTransactionManager transactionManager) {// Job转Step: 将子Job封装成父Stepreturn new JobStepBuilder(new StepBuilder("parent2Step")).job(childJob2).launcher(jobLauncher).repository(jobRepository).transactionManager(transactionManager).build();}
}

在这里插入图片描述

八:轻舟已过万重山

在这里插入图片描述

小时候,觉得忘记带作业是天大的事
高中的时候,觉得考不上大学是天大的事
恋爱的时候,觉得和喜欢的 人分开是天大的事
大学毕业时,觉得没有一个稳定的工作是天大的事

但现在回头看看
自己曾经难以跨过的山其实都已经跨过了
曾认为不能接受的,也都渐渐接受了

生活充满了选择
遗憾也不过是常态
失败也是贯穿生活始终的

其实人本身就是无论做什么选择都会后悔
只是总是习惯美化自己当初没有选择的另外一条路
可是大家都心知肚明,就算时间重来一次,以当时的心智和阅历还是会做出同样的选择
那么故事的结局可能就没那么重要了

我想人生就是一场享受过程的修行
与其后悔当初
不如回头看
轻舟已过万重山
向前看
前路漫漫亦灿灿

这篇关于SpringBatch从入门到实战(二):HelloWorld的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/191517

相关文章

Spring Boot + MyBatis Plus 高效开发实战从入门到进阶优化(推荐)

《SpringBoot+MyBatisPlus高效开发实战从入门到进阶优化(推荐)》本文将详细介绍SpringBoot+MyBatisPlus的完整开发流程,并深入剖析分页查询、批量操作、动... 目录Spring Boot + MyBATis Plus 高效开发实战:从入门到进阶优化1. MyBatis

MyBatis 动态 SQL 优化之标签的实战与技巧(常见用法)

《MyBatis动态SQL优化之标签的实战与技巧(常见用法)》本文通过详细的示例和实际应用场景,介绍了如何有效利用这些标签来优化MyBatis配置,提升开发效率,确保SQL的高效执行和安全性,感... 目录动态SQL详解一、动态SQL的核心概念1.1 什么是动态SQL?1.2 动态SQL的优点1.3 动态S

Pandas使用SQLite3实战

《Pandas使用SQLite3实战》本文主要介绍了Pandas使用SQLite3实战,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学... 目录1 环境准备2 从 SQLite3VlfrWQzgt 读取数据到 DataFrame基础用法:读

Python实战之屏幕录制功能的实现

《Python实战之屏幕录制功能的实现》屏幕录制,即屏幕捕获,是指将计算机屏幕上的活动记录下来,生成视频文件,本文主要为大家介绍了如何使用Python实现这一功能,希望对大家有所帮助... 目录屏幕录制原理图像捕获音频捕获编码压缩输出保存完整的屏幕录制工具高级功能实时预览增加水印多平台支持屏幕录制原理屏幕

最新Spring Security实战教程之Spring Security安全框架指南

《最新SpringSecurity实战教程之SpringSecurity安全框架指南》SpringSecurity是Spring生态系统中的核心组件,提供认证、授权和防护机制,以保护应用免受各种安... 目录前言什么是Spring Security?同类框架对比Spring Security典型应用场景传统

最新Spring Security实战教程之表单登录定制到处理逻辑的深度改造(最新推荐)

《最新SpringSecurity实战教程之表单登录定制到处理逻辑的深度改造(最新推荐)》本章节介绍了如何通过SpringSecurity实现从配置自定义登录页面、表单登录处理逻辑的配置,并简单模拟... 目录前言改造准备开始登录页改造自定义用户名密码登陆成功失败跳转问题自定义登出前后端分离适配方案结语前言

OpenManus本地部署实战亲测有效完全免费(最新推荐)

《OpenManus本地部署实战亲测有效完全免费(最新推荐)》文章介绍了如何在本地部署OpenManus大语言模型,包括环境搭建、LLM编程接口配置和测试步骤,本文给大家讲解的非常详细,感兴趣的朋友一... 目录1.概况2.环境搭建2.1安装miniconda或者anaconda2.2 LLM编程接口配置2

Python FastAPI入门安装使用

《PythonFastAPI入门安装使用》FastAPI是一个现代、快速的PythonWeb框架,用于构建API,它基于Python3.6+的类型提示特性,使得代码更加简洁且易于绶护,这篇文章主要介... 目录第一节:FastAPI入门一、FastAPI框架介绍什么是ASGI服务(WSGI)二、FastAP

基于Canvas的Html5多时区动态时钟实战代码

《基于Canvas的Html5多时区动态时钟实战代码》:本文主要介绍了如何使用Canvas在HTML5上实现一个多时区动态时钟的web展示,通过Canvas的API,可以绘制出6个不同城市的时钟,并且这些时钟可以动态转动,每个时钟上都会标注出对应的24小时制时间,详细内容请阅读本文,希望能对你有所帮助...

Spring AI与DeepSeek实战一之快速打造智能对话应用

《SpringAI与DeepSeek实战一之快速打造智能对话应用》本文详细介绍了如何通过SpringAI框架集成DeepSeek大模型,实现普通对话和流式对话功能,步骤包括申请API-KEY、项目搭... 目录一、概述二、申请DeepSeek的API-KEY三、项目搭建3.1. 开发环境要求3.2. mav