Spring Batch 如何自定義ItemReader
Spring Batch 自定義ItemReader
Spring Batch支持各種數據輸入源,如文件、數據庫等。然而有時也會遇到一些默認不支持的數據源,這時我們則需要實現自己的數據源————自定義ItemReader。本文通過示例說明如何自定義ItemReader。
創建自定義ItemReader
創建自定義ItemReader需要下面兩個步驟:
- 創建一個實現ItemReader接口的類,並提供返回對象類型 T 作為類型參數。
- 按照下面規則實現ItemReader接口的T read()方法
read()方法如果存在下一個對象則返回,否則返回null。
下面我們自定義ItemReader,其返回在線測試課程的學生信息StuDto類型,為瞭減少復雜性,該數據存儲在內存中。StuDto類是一個簡單數據傳輸對象,代碼如下:
@Data public class StuDTO { private String emailAddress; private String name; private String purchasedPackage; }
下面參照一下步驟創建ItemReader:
- 創建InMemoryStudentReader 類
- 實現ItemReader接口,並設置返回對象類型為StuDto
- 類中增加List studentData 字段,其包括參加課程的學生信息
- 類中增加nextStudentIndex 字段,表示下一個StuDto對象的索引
- 增加私有initialize()方法,初始化學生信息並設置索引值為0
- 創建構造函數並調用initialize方法
- 實現read()方法,包括下面規則:如果存在下一個學生,則返回StuDto對象並把索引加一。否則返回null。
InMemoryStudentReader 代碼如下:
public class InMemoryStudentReader implements ItemReader<StuDto> { private int nextStudentIndex; private List<StuDto> studentData; InMemoryStudentReader() { initialize(); } private void initialize() { StuDto tony = new StuDto(); tony.setEmailAddress("[email protected]"); tony.setName("Tony Tester"); tony.setPurchasedPackage("master"); StuDto nick = new StuDto(); nick.setEmailAddress("[email protected]"); nick.setName("Nick Newbie"); nick.setPurchasedPackage("starter"); StuDto ian = new StuDto(); ian.setEmailAddress("[email protected]"); ian.setName("Ian Intermediate"); ian.setPurchasedPackage("intermediate"); studentData = Collections.unmodifiableList(Arrays.asList(tony, nick, ian)); nextStudentIndex = 0; } @Override public StuDto read() throws Exception { StuDto nextStudent = null; if (nextStudentIndex < studentData.size()) { nextStudent = studentData.get(nextStudentIndex); nextStudentIndex++; } return nextStudent; } }
創建好自定義ItemReader後,需要配置其作為bean讓Spring Batch Job使用。下面請看如何配置。
配置ItemReader Bean
配置類代碼如下:
@Configuration public class InMemoryStudentJobConfig { @Bean ItemReader<StuDto> inMemoryStudentReader() { return new InMemoryStudentReader(); } }
需要增加@Configuration表明類為配置類, 增加方法返回ItemReader類型,並增加@Bean註解,實現方法內容————返回InMemoryStudentReader對象。
小結一下
本文通過示例說明如何自定義ItemReader,主要包括三個方面:
- 自定義ItemReader需實現ItemReader接口
- 實現ItemReader接口,需要指定返回類型作為類型參數(T)
- 實現接口方法read,如果存在下一個對象則返回,反之返回null
Spring Batch 之 ItemReader
重點介紹 ItemReader,如何從不同數據源讀取數據;以及異常處理及重啟機制。
JdbcPagingItemReader
從數據庫中讀取數據
@Configuration public class DBJdbcDemoJobConfiguration { @Autowired private JobBuilderFactory jobBuilderFactory; @Autowired private StepBuilderFactory stepBuilderFactory; @Autowired @Qualifier("dbJdbcDemoWriter") private ItemWriter<? super Customer> dbJdbcDemoWriter; @Autowired private DataSource dataSource; @Bean public Job DBJdbcDemoJob(){ return jobBuilderFactory.get("DBJdbcDemoJob") .start(dbJdbcDemoStep()) .build(); } @Bean public Step dbJdbcDemoStep() { return stepBuilderFactory.get("dbJdbcDemoStep") .<Customer,Customer>chunk(100) .reader(dbJdbcDemoReader()) .writer(dbJdbcDemoWriter) .build(); } @Bean @StepScope public JdbcPagingItemReader<Customer> dbJdbcDemoReader() { JdbcPagingItemReader<Customer> reader = new JdbcPagingItemReader<>(); reader.setDataSource(this.dataSource); reader.setFetchSize(100); //批量讀取 reader.setRowMapper((rs,rowNum)->{ return Customer.builder().id(rs.getLong("id")) .firstName(rs.getString("firstName")) .lastName(rs.getString("lastName")) .birthdate(rs.getString("birthdate")) .build(); }); MySqlPagingQueryProvider queryProvider = new MySqlPagingQueryProvider(); queryProvider.setSelectClause("id, firstName, lastName, birthdate"); queryProvider.setFromClause("from Customer"); Map<String, Order> sortKeys = new HashMap<>(1); sortKeys.put("id", Order.ASCENDING); queryProvider.setSortKeys(sortKeys); reader.setQueryProvider(queryProvider); return reader; } }
Job 和 ItermWriter不是本文介紹重點,此處舉例,下面例子相同
@Component("dbJdbcDemoWriter") public class DbJdbcDemoWriter implements ItemWriter<Customer> { @Override public void write(List<? extends Customer> items) throws Exception { for (Customer customer:items) System.out.println(customer); } }
FlatFileItemReader
從CVS文件中讀取數據
@Configuration public class FlatFileDemoJobConfiguration { @Autowired private JobBuilderFactory jobBuilderFactory; @Autowired private StepBuilderFactory stepBuilderFactory; @Autowired @Qualifier("flatFileDemoWriter") private ItemWriter<? super Customer> flatFileDemoWriter; @Bean public Job flatFileDemoJob(){ return jobBuilderFactory.get("flatFileDemoJob") .start(flatFileDemoStep()) .build(); } @Bean public Step flatFileDemoStep() { return stepBuilderFactory.get("flatFileDemoStep") .<Customer,Customer>chunk(100) .reader(flatFileDemoReader()) .writer(flatFileDemoWriter) .build(); } @Bean @StepScope public FlatFileItemReader<Customer> flatFileDemoReader() { FlatFileItemReader<Customer> reader = new FlatFileItemReader<>(); reader.setResource(new ClassPathResource("customer.csv")); reader.setLinesToSkip(1); DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer(); tokenizer.setNames(new String[]{"id","firstName","lastName","birthdate"}); DefaultLineMapper<Customer> lineMapper = new DefaultLineMapper<>(); lineMapper.setLineTokenizer(tokenizer); lineMapper.setFieldSetMapper((fieldSet -> { return Customer.builder().id(fieldSet.readLong("id")) .firstName(fieldSet.readString("firstName")) .lastName(fieldSet.readString("lastName")) .birthdate(fieldSet.readString("birthdate")) .build(); })); lineMapper.afterPropertiesSet(); reader.setLineMapper(lineMapper); return reader; } }
StaxEventItemReader
從XML文件中讀取數據
@Configuration public class XmlFileDemoJobConfiguration { @Autowired private JobBuilderFactory jobBuilderFactory; @Autowired private StepBuilderFactory stepBuilderFactory; @Autowired @Qualifier("xmlFileDemoWriter") private ItemWriter<? super Customer> xmlFileDemoWriter; @Bean public Job xmlFileDemoJob(){ return jobBuilderFactory.get("xmlFileDemoJob") .start(xmlFileDemoStep()) .build(); } @Bean public Step xmlFileDemoStep() { return stepBuilderFactory.get("xmlFileDemoStep") .<Customer,Customer>chunk(10) .reader(xmlFileDemoReader()) .writer(xmlFileDemoWriter) .build(); } @Bean @StepScope public StaxEventItemReader<Customer> xmlFileDemoReader() { StaxEventItemReader<Customer> reader = new StaxEventItemReader<>(); reader.setResource(new ClassPathResource("customer.xml")); reader.setFragmentRootElementName("customer"); XStreamMarshaller unMarshaller = new XStreamMarshaller(); Map<String,Class> map = new HashMap<>(); map.put("customer",Customer.class); unMarshaller.setAliases(map); reader.setUnmarshaller(unMarshaller); return reader; } }
MultiResourceItemReader
從多個文件讀取數據
@Configuration public class MultipleFileDemoJobConfiguration { @Autowired private JobBuilderFactory jobBuilderFactory; @Autowired private StepBuilderFactory stepBuilderFactory; @Autowired @Qualifier("flatFileDemoWriter") private ItemWriter<? super Customer> flatFileDemoWriter; @Value("classpath*:/file*.csv") private Resource[] inputFiles; @Bean public Job multipleFileDemoJob(){ return jobBuilderFactory.get("multipleFileDemoJob") .start(multipleFileDemoStep()) .build(); } @Bean public Step multipleFileDemoStep() { return stepBuilderFactory.get("multipleFileDemoStep") .<Customer,Customer>chunk(50) .reader(multipleResourceItemReader()) .writer(flatFileDemoWriter) .build(); } private MultiResourceItemReader<Customer> multipleResourceItemReader() { MultiResourceItemReader<Customer> reader = new MultiResourceItemReader<>(); reader.setDelegate(flatFileReader()); reader.setResources(inputFiles); return reader; } @Bean public FlatFileItemReader<Customer> flatFileReader() { FlatFileItemReader<Customer> reader = new FlatFileItemReader<>(); reader.setResource(new ClassPathResource("customer.csv")); // reader.setLinesToSkip(1); DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer(); tokenizer.setNames(new String[]{"id","firstName","lastName","birthdate"}); DefaultLineMapper<Customer> lineMapper = new DefaultLineMapper<>(); lineMapper.setLineTokenizer(tokenizer); lineMapper.setFieldSetMapper((fieldSet -> { return Customer.builder().id(fieldSet.readLong("id")) .firstName(fieldSet.readString("firstName")) .lastName(fieldSet.readString("lastName")) .birthdate(fieldSet.readString("birthdate")) .build(); })); lineMapper.afterPropertiesSet(); reader.setLineMapper(lineMapper); return reader; } }
異常處理及重啟機制
對於chunk-oriented step,Spring Batch提供瞭管理狀態的工具。如何在一個步驟中管理狀態是通過ItemStream接口為開發人員提供訪問權限保持狀態的組件。這裡提到的這個組件是ExecutionContext實際上它是鍵值對的映射。map存儲特定步驟的狀態。該ExecutionContext使重啟步驟成為可能,因為狀態在JobRepository中持久存在。
執行期間出現錯誤時,最後一個狀態將更新為JobRepository。下次作業運行時,最後一個狀態將用於填充ExecutionContext然後
可以繼續從上次離開的地方開始運行。
檢查ItemStream接口:
將在步驟開始時調用open()並執行ExecutionContext;
用DB填充值; update()將在每個步驟或事務結束時調用,更新ExecutionContext;
完成所有數據塊後調用close();
下面我們構造個例子
準備個cvs文件,在第33條數據,添加一條錯誤名字信息 ;當讀取到這條數據時,拋出異常終止程序。
ItemReader測試代碼
@Component("restartDemoReader") public class RestartDemoReader implements ItemStreamReader<Customer> { private Long curLine = 0L; private boolean restart = false; private FlatFileItemReader<Customer> reader = new FlatFileItemReader<>(); private ExecutionContext executionContext; RestartDemoReader public () { reader.setResource(new ClassPathResource("restartDemo.csv")); DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer(); tokenizer.setNames(new String[]{"id", "firstName", "lastName", "birthdate"}); DefaultLineMapper<Customer> lineMapper = new DefaultLineMapper<>(); lineMapper.setLineTokenizer(tokenizer); lineMapper.setFieldSetMapper((fieldSet -> { return Customer.builder().id(fieldSet.readLong("id")) .firstName(fieldSet.readString("firstName")) .lastName(fieldSet.readString("lastName")) .birthdate(fieldSet.readString("birthdate")) .build(); })); lineMapper.afterPropertiesSet(); reader.setLineMapper(lineMapper); } @Override public Customer read() throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException { Customer customer = null; this.curLine++; //如果是重啟,則從上一步讀取的行數繼續往下執行 if (restart) { reader.setLinesToSkip(this.curLine.intValue()-1); restart = false; System.out.println("Start reading from line: " + this.curLine); } reader.open(this.executionContext); customer = reader.read(); //當匹配到wrongName時,顯示拋出異常,終止程序 if (customer != null) { if (customer.getFirstName().equals("wrongName")) throw new RuntimeException("Something wrong. Customer id: " + customer.getId()); } else { curLine--; } return customer; } /** * 判斷是否是重啟job * @param executionContext * @throws ItemStreamException */ @Override public void open(ExecutionContext executionContext) throws ItemStreamException { this.executionContext = executionContext; if (executionContext.containsKey("curLine")) { this.curLine = executionContext.getLong("curLine"); this.restart = true; } else { this.curLine = 0L; executionContext.put("curLine", this.curLine.intValue()); } } @Override public void update(ExecutionContext executionContext) throws ItemStreamException { System.out.println("update curLine: " + this.curLine); executionContext.put("curLine", this.curLine); } @Override public void close() throws ItemStreamException { } }
Job配置
以10條記錄為一個批次,進行讀取
@Configuration public class RestartDemoJobConfiguration { @Autowired private JobBuilderFactory jobBuilderFactory; @Autowired private StepBuilderFactory stepBuilderFactory; @Autowired @Qualifier("flatFileDemoWriter") private ItemWriter<? super Customer> flatFileDemoWriter; @Autowired @Qualifier("restartDemoReader") private ItemReader<Customer> restartDemoReader; @Bean public Job restartDemoJob(){ return jobBuilderFactory.get("restartDemoJob") .start(restartDemoStep()) .build(); } @Bean public Step restartDemoStep() { return stepBuilderFactory.get("restartDemoStep") .<Customer,Customer>chunk(10) .reader(restartDemoReader) .writer(flatFileDemoWriter) .build(); } }
當我們第一次執行時,程序在33行拋出異常異常,curline值是30;
這時,我們可以查詢數據庫 batch_step_excution表,發現curline值已經以 鍵值對形式,持久化進數據庫(上文以10條數據為一個批次;故33條數據異常時,curline值為30)
接下來,我們更新wrongName,再次執行程序;
程序會執行open方法,判斷數據庫step中map是否存在curline,如果存在,則是重跑,即讀取curline,從該批次開始往下繼續執行;
以上為個人經驗,希望能給大傢一個參考,也希望大傢多多支持WalkonNet。