纯净、安全、绿色的下载网站
Spring Batch
的分层架构图如下:
可以看到它分为三层分别是:
Application
应用层:包含了所有任务batch jobs
和开发人员自定义的代码主要是根据项目需要开发的业务流程等Batch Core
核心层:包含启动和管理任务的运行环境类如JobLauncher
等Batch Infrastructure
基础层:上面两层是建立在基础层之上的包含基础的读入reader
和写出writer
、重试框架等理解下图所涉及的概念至关重要不然很难进行后续开发和问题分析
专门负责与数据库打交道对整个批处理的新增、更新、执行进行记录所以Spring Batch
是需要依赖数据库来管理的
负责启动任务Job
Job
是封装整个批处理过程的单位跑一个批处理任务就是跑一个Job
所定义的内容
上图介绍了Job
的一些相关概念:
Job
:封装处理实体定义过程逻辑JobInstance
:Job
的运行实例不同的实例参数不同所以定义好一个Job
后可以通过不同参数运行多次JobParameters
:与JobInstance
相关联的参数JobExecution
:代表Job
的一次实际执行可能成功、可能失败所以开发人员要做的事情就是定义Job
Step
是对Job
某个过程的封装一个Job
可以包含一个或多个Step
一步步的Step
按特定逻辑执行才代表Job
执行完成
通过定义Step
来组装Job
可以更灵活地实现复杂的业务逻辑
所以定义一个Job
关键是定义好一个或多个Step
然后把它们组装好即可而定义Step
有多种方法但有一种常用的模型就是输入——处理——输出
即Item Reader
、Item Processor
和Item Writer
比如通过Item Reader
从文件输入数据然后通过Item Processor
进行业务处理和数据转换最后通过Item Writer
写到数据库中去
Spring Batch
为我们提供了许多开箱即用的Reader
和Writer
非常方便
理解了基本概念后就直接通过代码来感受一下吧整个项目的功能是从多个csv
文件中读数据处理后输出到一个csv
文件
添加依赖:
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-batch</artifactId> </dependency> <dependency> <groupId>com.h2database</groupId> <artifactId>h2</artifactId> <scope>runtime</scope> </dependency>
需要添加Spring Batch
的依赖同时使用H2
作为内存数据库比较方便实际生产肯定是要使用外部的数据库如Oracle
、PostgreSQL
入口主类:
@SpringBootApplication @EnableBatchProcessing public class PkslowBatchJobMain { public static void main(String[] args) { SpringApplication.run(PkslowBatchJobMain.class, args); } }
也很简单只是在Springboot
的基础上添加注解@EnableBatchProcessing
领域实体类Employee
:
package com.pkslow.batch.entity; public class Employee { String id; String firstName; String lastName; }
对应的csv
文件内容如下:
id,firstName,lastName
1,Lokesh,Gupta
2,Amit,Mishra
3,Pankaj,Kumar
4,David,Miller
因为有多个输入文件所以定义如下:
@Value("input/inputData*.csv") private Resource[] inputResources; @Bean public MultiResourceItemReader<Employee> multiResourceItemReader() { MultiResourceItemReader<Employee> resourceItemReader = new MultiResourceItemReader<Employee>(); resourceItemReader.setResources(inputResources); resourceItemReader.setDelegate(reader()); return resourceItemReader; } @Bean public FlatFileItemReader<Employee> reader() { FlatFileItemReader<Employee> reader = new FlatFileItemReader<Employee>(); //跳过csv文件第一行为表头 reader.setLinesToSkip(1); reader.setLineMapper(new DefaultLineMapper() { { setLineTokenizer(new DelimitedLineTokenizer() { { //字段名 setNames(new String[] { "id", "firstName", "lastName" }); } }); setFieldSetMapper(new BeanWrapperFieldSetMapper<Employee>() { { //转换化后的目标类 setTargetType(Employee.class); } }); } }); return reader; }
这里使用了FlatFileItemReader
方便我们从文件读取数据
为了简单演示处理很简单就是把最后一列转为大写:
public ItemProcessor<Employee, Employee> itemProcessor() { return employee -> { employee.setLastName(employee.getLastName().toUpperCase()); return employee; }; }
比较简单代码及注释如下:
private Resource outputResource = new FileSystemResource("output/outputData.csv"); @Bean public FlatFileItemWriter<Employee> writer() { FlatFileItemWriter<Employee> writer = new FlatFileItemWriter<>(); writer.setResource(outputResource); //是否为追加模式 writer.setAppendAllowed(true); writer.setLineAggregator(new DelimitedLineAggregator<Employee>() { { //设置分割符 setDelimiter(","); setFieldExtractor(new BeanWrapperFieldExtractor<Employee>() { { //设置字段 setNames(new String[] { "id", "firstName", "lastName" }); } }); } }); return writer; }
有了Reader-Processor-Writer
后就可以定义Step
了:
@Bean public Step csvStep() { return stepBuilderFactory.get("csvStep").<Employee, Employee>chunk(5) .reader(multiResourceItemReader()) .processor(itemProcessor()) .writer(writer()) .build(); }
这里有一个chunk
的设置值为5
意思是5条记录后再提交输出可以根据自己需求定义
完成了Step
的编码定义Job
就容易了:
@Bean public Job pkslowCsvJob() { return jobBuilderFactory .get("pkslowCsvJob") .incrementer(new RunIdIncrementer()) .start(csvStep()) .build(); }
完成以上编码后执行程序结果如下:
成功读取数据并将最后字段转为大写并输出到outputData.csv
文件
可以通过Listener
接口对特定事件进行监听以实现更多业务功能比如如果处理失败就记录一条失败日志处理完成就通知下游拿数据等
我们分别对Read
、Process
和Write
事件进行监听对应分别要实现ItemReadListener
接口、ItemProcessListener
接口和ItemWriteListener
接口因为代码比较简单就是打印一下日志这里只贴出ItemWriteListener
的实现代码:
public class PkslowWriteListener implements ItemWriteListener<Employee> { private static final Log logger = LogFactory.getLog(PkslowWriteListener.class); @Override public void beforeWrite(List<? extends Employee> list) { logger.info("beforeWrite: " + list); } @Override public void afterWrite(List<? extends Employee> list) { logger.info("afterWrite: " + list); } @Override public void onWriteError(Exception e, List<? extends Employee> list) { logger.info("onWriteError: " + list); } }
把实现的监听器listener
整合到Step
中去:
@Bean public Step csvStep() { return stepBuilderFactory.get("csvStep").<Employee, Employee>chunk(5) .reader(multiResourceItemReader()) .listener(new PkslowReadListener()) .processor(itemProcessor()) .listener(new PkslowProcessListener()) .writer(writer()) .listener(new PkslowWriteListener()) .build(); }
执行后看一下日志:
这里就能明显看到之前设置的chunk
的作用了Writer
每次是处理5条记录如果一条输出一次会对IO
造成压力
python 操作sqlite数据库 python 操作sqlite数据库的方法