本文共 8966 字,大约阅读时间需要 29 分钟。
Spring-batch学习总结(五)
学习目标:掌握ItemProcessor1.ItemProcessor:spring-batch中数据处理的过程2.ItemProcessor主要用于实现业务逻辑,验证,过滤,等3.Spring-batch为我们提供ItemProcessor<I,O>这个接口,它包含一个方法O process(I item4.我们用代码进行演示:例:我们读取数据库表person_buf中的数据,将其id为奇数的数据剔除,将读出name进行字母大写转换首先观察数据库表数据结构:代码:
Personpackage com.dhcc.batch.batchDemo.processor;import java.util.Date;public class Person { private Integer id; private String name; private String perDesc; private Date createTime; private Date updateTime; private String sex; private Float score; private Double price; public Person() { super(); } public Person(Integer id, String name, String perDesc, Date createTime, Date updateTime, String sex, Float score, Double price) { super(); this.id = id; this.name = name; this.perDesc = perDesc; this.createTime = createTime; this.updateTime = updateTime; this.sex = sex; this.score = score; this.price = price; } public Integer getId() { return id; } public void setId(Integer id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public Date getCreateTime() { return createTime; } public String getPerDesc() { return perDesc; } public void setPerDesc(String perDesc) { this.perDesc = perDesc; } public void setCreateTime(Date createTime) { this.createTime = createTime; } public Date getUpdateTime() { return updateTime; } public void setUpdateTime(Date updateTime) { this.updateTime = updateTime; } public String getSex() { return sex; } public void setSex(String sex) { this.sex = sex; } public Float getScore() { return score; } public void setScore(Float score) { this.score = score; } public Double getPrice() { return price; } public void setPrice(Double price) { this.price = price; } @Override public String toString() { return "Person [id=" + id + ", name=" + name + ", perDesc=" + perDesc + ", createTime=" + createTime + ", updateTime=" + updateTime + ", sex=" + sex + ", score=" + score + ", price=" + price + "]"; }}
PersonLineAggregator
package com.dhcc.batch.batchDemo.processor;import org.springframework.batch.item.file.transform.LineAggregator;import com.fasterxml.jackson.core.JsonProcessingException;import com.fasterxml.jackson.databind.ObjectMapper;public class PersonLineAggregator implements LineAggregator{ //JSON private ObjectMapper mapper=new ObjectMapper(); @Override public String aggregate(Person person) { try { return mapper.writeValueAsString(person); } catch (JsonProcessingException e) { throw new RuntimeException("unable to writer...",e); } }}
PersonRowMapper
package com.dhcc.batch.batchDemo.processor;import java.sql.ResultSet;import java.sql.SQLException;import org.springframework.jdbc.core.RowMapper;/** * 实现将数据库中的每条数据映射到Person对象中 * @author Administrator * */public class PersonRowMapper implements RowMapper{ /** * rs一条结果集,rowNum代表当前行 */ @Override public Person mapRow(ResultSet rs, int rowNum) throws SQLException { return new Person(rs.getInt("id") ,rs.getString("name") ,rs.getString("per_desc") ,rs.getDate("create_time") ,rs.getDate("update_time") ,rs.getString("sex") ,rs.getFloat("score") ,rs.getDouble("price")); }}
ProcessorFileApplication
package com.dhcc.batch.batchDemo.processor;import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication@EnableBatchProcessingpublic class ProcessorFileApplication { public static void main(String[] args) { SpringApplication.run(ProcessorFileApplication.class, args); }}
ProcessorFileOutputFromDBConfiguration
package com.dhcc.batch.batchDemo.processor;import java.io.File;import java.util.ArrayList;import java.util.HashMap;import java.util.List;import java.util.Map;import javax.sql.DataSource;import org.springframework.batch.core.Job;import org.springframework.batch.core.Step;import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;import org.springframework.batch.core.configuration.annotation.StepScope;import org.springframework.batch.item.ItemProcessor;import org.springframework.batch.item.database.JdbcPagingItemReader;import org.springframework.batch.item.database.Order;import org.springframework.batch.item.database.support.MySqlPagingQueryProvider;import org.springframework.batch.item.file.FlatFileItemWriter;import org.springframework.batch.item.support.CompositeItemProcessor;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.core.io.FileSystemResource;@Configurationpublic class ProcessorFileOutputFromDBConfiguration { @Autowired private JobBuilderFactory jobBuilderFactory; @Autowired private StepBuilderFactory stepBuilderFactory; @Autowired private DataSource dataSource; @Autowired private ItemProcessorfristNameUpperCaseProcessor; @Autowired private ItemProcessor idFilterProcessor; @Bean public Job ProcessorFileOutputFromDBJob() { return jobBuilderFactory.get("ProcessorFileOutputFromDBJob") .start(ProcessorFileOutputFromDBStep()) .build(); } @Bean public Step ProcessorFileOutputFromDBStep() { return stepBuilderFactory.get("ProcessorFileOutputFromDBStep") . chunk(100) .reader(ProcessorFileOutputFromItemWriter()) .processor(personDataProcessor()) .writer(ProcessorFileOutputFromItemReader()) .build(); } @Bean @StepScope public JdbcPagingItemReader ProcessorFileOutputFromItemWriter() { JdbcPagingItemReader reader = new JdbcPagingItemReader<>(); reader.setDataSource(this.dataSource); // 设置数据源 reader.setFetchSize(100); // 设置一次最大读取条数 reader.setRowMapper(new PersonRowMapper()); // 把数据库中的每条数据映射到AlipaytranDo对像中 MySqlPagingQueryProvider queryProvider = new MySqlPagingQueryProvider(); queryProvider.setSelectClause("id,name,per_desc,create_time,update_time,sex,score,price"); // 设置查询的列 queryProvider.setFromClause("from person_buf"); // 设置要查询的表 Map sortKeys = new HashMap ();// 定义一个集合用于存放排序列 sortKeys.put("id", Order.ASCENDING);// 按照升序排序 queryProvider.setSortKeys(sortKeys); reader.setQueryProvider(queryProvider);// 设置排序列 return reader; } @Bean public CompositeItemProcessor personDataProcessor(){ CompositeItemProcessor processor=new CompositeItemProcessor<>(); List > listProcessor=new ArrayList<>(); listProcessor.add(fristNameUpperCaseProcessor); listProcessor.add(idFilterProcessor); processor.setDelegates(listProcessor); return processor; } @Bean @StepScope public FlatFileItemWriter ProcessorFileOutputFromItemReader() { FlatFileItemWriter writer = new FlatFileItemWriter (); try { File path = new File("D:" + File.separator + "newPerson.json").getAbsoluteFile(); System.out.println("file is create in :" + path); writer.setResource(new FileSystemResource(path)); writer.setLineAggregator(new PersonLineAggregator()); writer.afterPropertiesSet(); } catch (Exception e) { e.printStackTrace(); } return writer; }}
FristNameUpperCaseProcessor
package com.dhcc.batch.batchDemo.processor;import org.springframework.batch.item.ItemProcessor;import org.springframework.stereotype.Component;@Componentpublic class FristNameUpperCaseProcessor implements ItemProcessor{ @Override public Person process(Person item) throws Exception { return new Person(item.getId(), item.getName().toUpperCase(), item.getPerDesc(), item.getCreateTime(), item.getUpdateTime(), item.getSex(), item.getScore(), item.getPrice()); }}
IdFilterProcessor
package com.dhcc.batch.batchDemo.processor;import org.springframework.batch.item.ItemProcessor;import org.springframework.stereotype.Component;@Componentpublic class IdFilterProcessor implements ItemProcessor{ @Override public Person process(Person item) throws Exception { if (item.getId() % 2 == 0) { return item; } else { return null; } }}
运行结果:
观察写入完成后的文件:可以看出我们已经完成了我们的目标转载于:https://blog.51cto.com/13501268/2298950