Spring Batch是个轻量级的、 完善的批处理框架,旨在帮助企业建立健壮、高效的批处理应用。Spring Batch是Spring的一个子项目,使用Java语言并基于Spring框架为基础开发,使得已经使用Spring框架的开发者或者企业更容易访问和利用企业服务。
Spring Batch提供了大量可重用的组件,包括了日志追踪、事务、任务作业统计、任务重启、跳过、重复资源管理。对于大数据量和高性能的批处理任 务,Spring Batch同样提供了高级功能和特性来支持比如分区功能、远程功能。总之,通过Spring Batch能够支持简单的、复杂的和大数据量的批处理作业。
Spring Batch是一个批处理应用框架,不是调度框架,但需要和调度框架合作来构建完成的批处理任务。它只关注批处理任务相关的问题,如事务、并发、监控、执行等,并不提供相应的调度功能。如果需要使用调用框架,在商业软件和开源软件中已经有很多优秀的企业级调度框架(如Quartz. Tivoli、 Control-M、 Cron等)可以使用。
框架主要有以下功能: Transaction management (事务管理) Chunk based processing (基于块的处理) Declarative 1/0 (声明式的输入输出) Start/Stop/Restart (启动/停止/再启动) Retry/Skip (重试/跳过)
Spring Batch是一个开源的批量处理框架,Spring Batch提供了类和API来读写资源,管理事务,作业处理统计、重启、以及分区技术处理大量数据。在Spring Batch中,一个作业任务可以由多个step组成,每个任务又都可以分为Read-Process-Write或者是tasklet



框架一共有4个主要角色:
- JobLauncher是任务启动器,通过它来启动任务,可以看做是程序的入口。
- Job代表着一个具体的任务。
- Step代表着一个具体的步骤,一个Job可以包含多个Step (想象把大象放进冰箱这个任务需要多少个步骤你就明白了) .
- JobRepository是存储数据的地方,可以看做是一个数据库的接口,在任务执行的时候需要通过它来记录任务状态等等信息。
- 对于“Read-Process-Write”过程,它是指从资源(csv、xml或数据库)中“读取”数据,“处理”它并“写入”它到其他资源(csv、xml和数据库)。例如,步骤可以从CSV文件中读取数据,对其进行处理并将其写入数据库。Spring Batch提供了许多定制类来读/写CSV、XML和数据库。
- 对于“单个”操作任务(tasklet),它意味着只执行单个任务,比如在步骤启动或完成之后或之前清理资源。
- 这些步骤可以链接在一起作为作业运行。
SpringBatch入门程序
1.创建一个SpringBoot项目,SpringBatch依赖,如下:
<?xml version="1.0" encoding="UTF-8"?> 2 <project xmlns="http://maven.apache.org/POM/4.0.0" 3 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 4 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 5 <modelVersion>4.0.0</modelVersion> 6 7 <groupId>com.test</groupId> 8 <artifactId>test-springboot-batch</artifactId> 9 <version>1.0-SNAPSHOT</version> 10 11 <parent> 12 <groupId>org.springframework.boot</groupId> 13 <artifactId>spring-boot-starter-parent</artifactId> 14 <version>2.2.5.RELEASE</version> 15 <relativePath/> <!-- lookup parent from repository --> 16 </parent> 17 18 <properties> 19 <maven.compiler.source>8</maven.compiler.source> 20 <maven.compiler.target>8</maven.compiler.target> 21 </properties> 22 23 <dependencies> 24 <dependency> 25 <groupId>org.springframework.boot</groupId> 26 <artifactId>spring-boot-starter-web</artifactId> 27 </dependency> 28 29 <dependency> 30 <groupId>org.springframework.boot</groupId> 31 <artifactId>spring-boot-starter-batch</artifactId> 32 </dependency> 33 34 <!-- mysql --> 35 <dependency> 36 <groupId>mysql</groupId> 37 <artifactId>mysql-connector-java</artifactId> 38 <version>8.0.12</version> 39 </dependency> 40 </dependencies> 41 42 <build> 43 <plugins> 44 <plugin> 45 <groupId>org.springframework.boot</groupId> 46 <artifactId>spring-boot-maven-plugin</artifactId> 47 </plugin> 48 </plugins> 49 </build> 50 51 </project>
2.编辑配置文件,这里要使用mysql数据库,任务信息持久化到数据库中
spring:
datasource:
username: root
password: 123456
url: jdbc:mysql://127.0.0.1:3306/test_springbatch?allowPublicKeyRetrieval=true&useSSL=true
driver-class-name: com.mysql.cj.jdbc.Driver
# 初始化数据库,文件在依赖jar包中
schema: classpath:org/springframework/batch/core/schema-mysql.sql
initialization-mode: always
3.编辑主启动类
@SpringBootApplication
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class);
}
}
4.配置Job
package com.test.config;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.StepContribution;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.scope.context.ChunkContext;
import org.springframework.batch.core.step.tasklet.Tasklet;
import org.springframework.batch.repeat.RepeatStatus;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @author H__D
* @description
* @date 2021/10/30
*/
@Configuration
// 启用批处理功能
@EnableBatchProcessing
public class JobConfiguration {
// 注入创建任务的对象
@Autowired
private JobBuilderFactory jobBuilderFactory;
// 注入创建步骤的对象
@Autowired
private StepBuilderFactory stepBuilderFactory;
@Bean
public Job helloworldJob() {
return jobBuilderFactory.get("helloworldJob")
.start(step1())
.build();
}
@Bean
public Step step1() {
return stepBuilderFactory.get("step1")
.tasklet(new Tasklet() {
@Override
public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
System.out.println(Thread.currentThread().getName() + "------" + "hello world");
// 返回执行完成状态
return RepeatStatus.FINISHED;
}
}).build();
}
}
5.运行启动类


另一个小例子
1.项目依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<scope>runtime</scope>
</dependency>
2.编写一个简单的Tasklet
public class MessageTasklet implements Tasklet {
@Override
public RepeatStatus execute(StepContribution stepContribution, ChunkContext chunkContext) throws Exception {
String message = (String) chunkContext.getStepContext().getJobParameters().get("message");
ExecutionContext jobContext = chunkContext.getStepContext().getStepExecution().getJobExecution().getExecutionContext();
jobContext.put("message", message);
//打印传入的参数
System.out.println(message);
return RepeatStatus.FINISHED;
}
}
3.Job配置
@Configuration
public class TaskletJobConfiguration {
@Autowired
private JobBuilderFactory jobs;
@Autowired
private StepBuilderFactory steps;
@Bean
public Job taskletJob() {
return this.jobs.get("taskletJob")
.start(step())
.build();
}
@Bean
protected Step step() {
return steps
.get("step")
.tasklet(messageTasklet())
.build();
}
@Bean
public MessageTasklet messageTasklet() {
MessageTasklet tasklet = new MessageTasklet();
return tasklet;
}
}
4.参数配置
#初始化Spring Batch 数据表 spring.batch.initialize-schema=always #工程启动时不执行任务 spring.batch.job.enabled=false spring.datasource.url=jdbc:mysql://127.0.0.1:3306/test?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&useSSL=false&zeroDateTimeBehavior=convertToNull spring.datasource.username=username spring.datasource.password=password spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver spring.datasource.type=com.zaxxer.hikari.HikariDataSource spring.datasource.hikari.pool-name=HikariPool #最大连接数,小于等于0会被重置为默认值10;大于零小于1会被重置为minimum-idle的值 spring.datasource.hikari.maximum-pool-size=10 #连接超时时间:毫秒,小于250毫秒,否则被重置为默认值30秒 spring.datasource.hikari.connection-timeout=60000 #最小空闲连接,默认值10,小于0或大于maximum-pool-size,都会重置为maximum-pool-size spring.datasource.hikari.minimum-idle=10 #空闲连接超时时间,默认值600000(10分钟),大于等于max-lifetime且max-lifetime>0,会被重置为0;不等于0且小于10秒,会被重置为10秒。 # 只有空闲连接数大于最大连接数且空闲时间超过该值,才会被释放 spring.datasource.hikari.idle-timeout=500000 #连接最大存活时间.不等于0且小于30秒,会被重置为默认值30分钟.设置应该比mysql设置的超时时间短 spring.datasource.hikari.max-lifetime=540000 #连接测试查询 spring.datasource.hikari.connection-test-query=SELECT 1
5.接口测试,最后不要忘记在启动类上加上注解@EnableBatchProcessing
@RestController
@Slf4j
public class JobLauncherController {
@Autowired
JobLauncher jobLauncher;
@Autowired
Job job;
@RequestMapping("/launchjob")
public String handle() throws Exception {
String parameter = UUID.randomUUID().toString();
try {
//接口每次都重新生成一个UUID,如果参数完全相同,日志会提示任务已经执行成功,不能重复执行
JobParameters jobParameters = new JobParametersBuilder().addString("message", "Welcome To Spring Batch World!" + parameter)
.toJobParameters();
jobLauncher.run(job, jobParameters);
} catch (Exception e) {
log.error("", e);
}
return parameter;
}
}
6.Job拦截器,实现JobExecutionListener接口后在Job配置的地方增加一下listener即可,如下:
@Component
public class InterceptingJobExecution implements JobExecutionListener {
@Override
public void beforeJob(JobExecution jobExecution) {
System.out.println("Intercepting Job Execution - Before Job!");
}
@Override
public void afterJob(JobExecution jobExecution) {
System.out.println("Intercepting Job Execution - after Job!");
}
}