博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Spring-batch (ItemProcessor) 数据处理过程
阅读量:6307 次
发布时间:2019-06-22

本文共 8966 字,大约阅读时间需要 29 分钟。

Spring-batch学习总结(五)

学习目标:掌握ItemProcessor
1.ItemProcessor:spring-batch中数据处理的过程
2.ItemProcessor主要用于实现业务逻辑,验证,过滤,等
3.Spring-batch为我们提供ItemProcessor<I,O>这个接口,它包含一个方法O process(I item
4.我们用代码进行演示:
例:我们读取数据库表person_buf中的数据,将其id为奇数的数据剔除,将读出name进行字母大写转换
首先观察数据库表数据结构:
Spring-batch (ItemProcessor) 数据处理过程

代码:

Person

package com.dhcc.batch.batchDemo.processor;import java.util.Date;public class Person {    private Integer id;    private String name;    private String perDesc;    private Date createTime;    private Date updateTime;    private String sex;    private Float score;    private Double price;    public Person() {        super();    }    public Person(Integer id, String name, String perDesc, Date createTime, Date updateTime, String sex, Float score,            Double price) {        super();        this.id = id;        this.name = name;        this.perDesc = perDesc;        this.createTime = createTime;        this.updateTime = updateTime;        this.sex = sex;        this.score = score;        this.price = price;    }    public Integer getId() {        return id;    }    public void setId(Integer id) {        this.id = id;    }    public String getName() {        return name;    }    public void setName(String name) {        this.name = name;    }    public Date getCreateTime() {        return createTime;    }    public String getPerDesc() {        return perDesc;    }    public void setPerDesc(String perDesc) {        this.perDesc = perDesc;    }    public void setCreateTime(Date createTime) {        this.createTime = createTime;    }    public Date getUpdateTime() {        return updateTime;    }    public void setUpdateTime(Date updateTime) {        this.updateTime = updateTime;    }    public String getSex() {        return sex;    }    public void setSex(String sex) {        this.sex = sex;    }    public Float getScore() {        return score;    }    public void setScore(Float score) {        this.score = score;    }    public Double getPrice() {        return price;    }    public void setPrice(Double price) {        this.price = price;    }    @Override    public String toString() {        return "Person [id=" + id + ", name=" + name + ", perDesc=" + perDesc + ", createTime=" + createTime + ", updateTime="                + updateTime + ", sex=" + sex + ", score=" + score + ", price=" + price + "]";    }}

PersonLineAggregator

package com.dhcc.batch.batchDemo.processor;import org.springframework.batch.item.file.transform.LineAggregator;import com.fasterxml.jackson.core.JsonProcessingException;import com.fasterxml.jackson.databind.ObjectMapper;public class PersonLineAggregator implements LineAggregator
{ //JSON private ObjectMapper mapper=new ObjectMapper(); @Override public String aggregate(Person person) { try { return mapper.writeValueAsString(person); } catch (JsonProcessingException e) { throw new RuntimeException("unable to writer...",e); } }}

PersonRowMapper

package com.dhcc.batch.batchDemo.processor;import java.sql.ResultSet;import java.sql.SQLException;import org.springframework.jdbc.core.RowMapper;/** * 实现将数据库中的每条数据映射到Person对象中 * @author Administrator * */public class PersonRowMapper implements RowMapper
{ /** * rs一条结果集,rowNum代表当前行 */ @Override public Person mapRow(ResultSet rs, int rowNum) throws SQLException { return new Person(rs.getInt("id") ,rs.getString("name") ,rs.getString("per_desc") ,rs.getDate("create_time") ,rs.getDate("update_time") ,rs.getString("sex") ,rs.getFloat("score") ,rs.getDouble("price")); }}

ProcessorFileApplication

package com.dhcc.batch.batchDemo.processor;import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;import org.springframework.boot.SpringApplication;import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication@EnableBatchProcessingpublic class ProcessorFileApplication {    public static void main(String[] args) {        SpringApplication.run(ProcessorFileApplication.class, args);    }}

ProcessorFileOutputFromDBConfiguration

package com.dhcc.batch.batchDemo.processor;import java.io.File;import java.util.ArrayList;import java.util.HashMap;import java.util.List;import java.util.Map;import javax.sql.DataSource;import org.springframework.batch.core.Job;import org.springframework.batch.core.Step;import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;import org.springframework.batch.core.configuration.annotation.StepScope;import org.springframework.batch.item.ItemProcessor;import org.springframework.batch.item.database.JdbcPagingItemReader;import org.springframework.batch.item.database.Order;import org.springframework.batch.item.database.support.MySqlPagingQueryProvider;import org.springframework.batch.item.file.FlatFileItemWriter;import org.springframework.batch.item.support.CompositeItemProcessor;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.core.io.FileSystemResource;@Configurationpublic class ProcessorFileOutputFromDBConfiguration {    @Autowired    private JobBuilderFactory jobBuilderFactory;    @Autowired    private StepBuilderFactory stepBuilderFactory;    @Autowired    private DataSource dataSource;    @Autowired    private ItemProcessor
fristNameUpperCaseProcessor; @Autowired private ItemProcessor
idFilterProcessor; @Bean public Job ProcessorFileOutputFromDBJob() { return jobBuilderFactory.get("ProcessorFileOutputFromDBJob") .start(ProcessorFileOutputFromDBStep()) .build(); } @Bean public Step ProcessorFileOutputFromDBStep() { return stepBuilderFactory.get("ProcessorFileOutputFromDBStep") .
chunk(100) .reader(ProcessorFileOutputFromItemWriter()) .processor(personDataProcessor()) .writer(ProcessorFileOutputFromItemReader()) .build(); } @Bean @StepScope public JdbcPagingItemReader
ProcessorFileOutputFromItemWriter() { JdbcPagingItemReader
reader = new JdbcPagingItemReader<>(); reader.setDataSource(this.dataSource); // 设置数据源 reader.setFetchSize(100); // 设置一次最大读取条数 reader.setRowMapper(new PersonRowMapper()); // 把数据库中的每条数据映射到AlipaytranDo对像中 MySqlPagingQueryProvider queryProvider = new MySqlPagingQueryProvider(); queryProvider.setSelectClause("id,name,per_desc,create_time,update_time,sex,score,price"); // 设置查询的列 queryProvider.setFromClause("from person_buf"); // 设置要查询的表 Map
sortKeys = new HashMap
();// 定义一个集合用于存放排序列 sortKeys.put("id", Order.ASCENDING);// 按照升序排序 queryProvider.setSortKeys(sortKeys); reader.setQueryProvider(queryProvider);// 设置排序列 return reader; } @Bean public CompositeItemProcessor
personDataProcessor(){ CompositeItemProcessor
processor=new CompositeItemProcessor<>(); List
> listProcessor=new ArrayList<>(); listProcessor.add(fristNameUpperCaseProcessor); listProcessor.add(idFilterProcessor); processor.setDelegates(listProcessor); return processor; } @Bean @StepScope public FlatFileItemWriter
ProcessorFileOutputFromItemReader() { FlatFileItemWriter
writer = new FlatFileItemWriter
(); try { File path = new File("D:" + File.separator + "newPerson.json").getAbsoluteFile(); System.out.println("file is create in :" + path); writer.setResource(new FileSystemResource(path)); writer.setLineAggregator(new PersonLineAggregator()); writer.afterPropertiesSet(); } catch (Exception e) { e.printStackTrace(); } return writer; }}

FristNameUpperCaseProcessor

package com.dhcc.batch.batchDemo.processor;import org.springframework.batch.item.ItemProcessor;import org.springframework.stereotype.Component;@Componentpublic class FristNameUpperCaseProcessor implements ItemProcessor
{ @Override public Person process(Person item) throws Exception { return new Person(item.getId(), item.getName().toUpperCase(), item.getPerDesc(), item.getCreateTime(), item.getUpdateTime(), item.getSex(), item.getScore(), item.getPrice()); }}

IdFilterProcessor

package com.dhcc.batch.batchDemo.processor;import org.springframework.batch.item.ItemProcessor;import org.springframework.stereotype.Component;@Componentpublic class IdFilterProcessor implements ItemProcessor
{ @Override public Person process(Person item) throws Exception { if (item.getId() % 2 == 0) { return item; } else { return null; } }}

运行结果:

Spring-batch (ItemProcessor) 数据处理过程
观察写入完成后的文件:
Spring-batch (ItemProcessor) 数据处理过程
可以看出我们已经完成了我们的目标

转载于:https://blog.51cto.com/13501268/2298950

你可能感兴趣的文章
天猫高管全面解读大快消2018新零售打法
查看>>
idea springboot热部署无效问题
查看>>
第八章 进程间通信
查看>>
HttpSession接口中的方法(Jsp中的session类的用法)
查看>>
「镁客早报」AI可预测心脏病人死亡时间;机器人开始在美国送外卖
查看>>
MoQ(基于.net3.5,c#3.0的mock框架)简单介绍
查看>>
物联网全面升级,十年内推动工业进入智能化新阶段
查看>>
spring-通过ListFactory注入List
查看>>
一种基于SDR实现的被动GSM嗅探
查看>>
阿里云ECS每天一件事D1:配置SSH
查看>>
SQL Server 性能调优(性能基线)
查看>>
uva 10801 - Lift Hopping(最短路Dijkstra)
查看>>
[Java Web]servlet/filter/listener/interceptor区别与联系
查看>>
POJ 2312Battle City(BFS-priority_queue 或者是建图spfa)
查看>>
从零开始学MVC3——创建项目
查看>>
CentOS 7 巨大变动之 firewalld 取代 iptables
查看>>
延时任务和定时任务
查看>>
linux下的权限问题
查看>>
教你如何使用Flutter和原生App混合开发
查看>>
Spring Boot 整合redis
查看>>