Spring Batch 如何自定義ItemReader

Spring Batch 自定義ItemReader

Spring Batch支持各種數據輸入源,如文件、數據庫等。然而有時也會遇到一些默認不支持的數據源,這時我們則需要實現自己的數據源————自定義ItemReader。本文通過示例說明如何自定義ItemReader。

創建自定義ItemReader

創建自定義ItemReader需要下面兩個步驟:

  • 創建一個實現ItemReader接口的類,並提供返回對象類型 T 作為類型參數。
  • 按照下面規則實現ItemReader接口的T read()方法

read()方法如果存在下一個對象則返回,否則返回null。

下面我們自定義ItemReader,其返回在線測試課程的學生信息StuDto類型,為瞭減少復雜性,該數據存儲在內存中。StuDto類是一個簡單數據傳輸對象,代碼如下:

@Data
public class StuDTO {
    private String emailAddress;
    private String name;
    private String purchasedPackage;
}

下面參照一下步驟創建ItemReader:

  • 創建InMemoryStudentReader 類
  • 實現ItemReader接口,並設置返回對象類型為StuDto
  • 類中增加List studentData 字段,其包括參加課程的學生信息
  • 類中增加nextStudentIndex 字段,表示下一個StuDto對象的索引
  • 增加私有initialize()方法,初始化學生信息並設置索引值為0
  • 創建構造函數並調用initialize方法
  • 實現read()方法,包括下面規則:如果存在下一個學生,則返回StuDto對象並把索引加一。否則返回null。

InMemoryStudentReader 代碼如下:

public class InMemoryStudentReader implements ItemReader<StuDto> { 
    private int nextStudentIndex;
    private List<StuDto> studentData; 
    InMemoryStudentReader() {
        initialize();
    }
 
    private void initialize() {
        StuDto tony = new StuDto();
        tony.setEmailAddress("[email protected]");
        tony.setName("Tony Tester");
        tony.setPurchasedPackage("master");
 
        StuDto nick = new StuDto();
        nick.setEmailAddress("[email protected]");
        nick.setName("Nick Newbie");
        nick.setPurchasedPackage("starter");
 
        StuDto ian = new StuDto();
        ian.setEmailAddress("[email protected]");
        ian.setName("Ian Intermediate");
        ian.setPurchasedPackage("intermediate");
 
        studentData = Collections.unmodifiableList(Arrays.asList(tony, nick, ian));
        nextStudentIndex = 0;
    }
 
    @Override
    public StuDto read() throws Exception {
        StuDto nextStudent = null;
 
        if (nextStudentIndex < studentData.size()) {
            nextStudent = studentData.get(nextStudentIndex);
            nextStudentIndex++;
        } 
        return nextStudent;
    }
}

創建好自定義ItemReader後,需要配置其作為bean讓Spring Batch Job使用。下面請看如何配置。

配置ItemReader Bean

配置類代碼如下:

@Configuration
public class InMemoryStudentJobConfig { 
    @Bean
    ItemReader<StuDto> inMemoryStudentReader() {
        return new InMemoryStudentReader();
    }
}

需要增加@Configuration表明類為配置類, 增加方法返回ItemReader類型,並增加@Bean註解,實現方法內容————返回InMemoryStudentReader對象。

小結一下

本文通過示例說明如何自定義ItemReader,主要包括三個方面:

  • 自定義ItemReader需實現ItemReader接口
  • 實現ItemReader接口,需要指定返回類型作為類型參數(T)
  • 實現接口方法read,如果存在下一個對象則返回,反之返回null

Spring Batch 之 ItemReader

重點介紹 ItemReader,如何從不同數據源讀取數據;以及異常處理及重啟機制。

JdbcPagingItemReader

從數據庫中讀取數據

@Configuration
public class DBJdbcDemoJobConfiguration {
    @Autowired
    private JobBuilderFactory jobBuilderFactory;
 
    @Autowired
    private StepBuilderFactory stepBuilderFactory;
 
    @Autowired
    @Qualifier("dbJdbcDemoWriter")
    private ItemWriter<? super Customer> dbJdbcDemoWriter;
 
    @Autowired
    private DataSource dataSource;
 
    @Bean
    public Job DBJdbcDemoJob(){
        return jobBuilderFactory.get("DBJdbcDemoJob")
                .start(dbJdbcDemoStep())
                .build();
     }
 
    @Bean
    public Step dbJdbcDemoStep() {
        return stepBuilderFactory.get("dbJdbcDemoStep")
                .<Customer,Customer>chunk(100)
                .reader(dbJdbcDemoReader())
                .writer(dbJdbcDemoWriter)
                .build();
    }
 
    @Bean
    @StepScope
    public JdbcPagingItemReader<Customer> dbJdbcDemoReader() {
        JdbcPagingItemReader<Customer> reader = new JdbcPagingItemReader<>();
 
        reader.setDataSource(this.dataSource);
        reader.setFetchSize(100); //批量讀取
        reader.setRowMapper((rs,rowNum)->{
            return Customer.builder().id(rs.getLong("id"))
                    .firstName(rs.getString("firstName"))
                    .lastName(rs.getString("lastName"))
                    .birthdate(rs.getString("birthdate"))
                    .build();
 
        });
 
        MySqlPagingQueryProvider queryProvider = new MySqlPagingQueryProvider();
        queryProvider.setSelectClause("id, firstName, lastName, birthdate");
        queryProvider.setFromClause("from Customer");
        Map<String, Order> sortKeys = new HashMap<>(1);
        sortKeys.put("id", Order.ASCENDING);
        queryProvider.setSortKeys(sortKeys); 
        reader.setQueryProvider(queryProvider); 
        return reader; 
    }
}

Job 和 ItermWriter不是本文介紹重點,此處舉例,下面例子相同

@Component("dbJdbcDemoWriter")
public class DbJdbcDemoWriter implements ItemWriter<Customer> {
    @Override
    public void write(List<? extends Customer> items) throws Exception {
        for (Customer customer:items)
            System.out.println(customer); 
    }
}

FlatFileItemReader

從CVS文件中讀取數據

 
@Configuration
public class FlatFileDemoJobConfiguration {
    @Autowired
    private JobBuilderFactory jobBuilderFactory; 
    @Autowired
    private StepBuilderFactory stepBuilderFactory; 
    @Autowired
    @Qualifier("flatFileDemoWriter")
    private ItemWriter<? super Customer> flatFileDemoWriter; 
    @Bean
    public Job flatFileDemoJob(){
        return jobBuilderFactory.get("flatFileDemoJob")
                .start(flatFileDemoStep())
                .build(); 
    }
 
    @Bean
    public Step flatFileDemoStep() {
        return stepBuilderFactory.get("flatFileDemoStep")
                .<Customer,Customer>chunk(100)
                .reader(flatFileDemoReader())
                .writer(flatFileDemoWriter)
                .build();
    }
 
    @Bean
    @StepScope
    public FlatFileItemReader<Customer> flatFileDemoReader() {
        FlatFileItemReader<Customer> reader = new FlatFileItemReader<>();
        reader.setResource(new ClassPathResource("customer.csv"));
        reader.setLinesToSkip(1);
 
        DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
        tokenizer.setNames(new String[]{"id","firstName","lastName","birthdate"});
 
        DefaultLineMapper<Customer> lineMapper = new DefaultLineMapper<>();
        lineMapper.setLineTokenizer(tokenizer);
        lineMapper.setFieldSetMapper((fieldSet -> {
            return Customer.builder().id(fieldSet.readLong("id"))
                    .firstName(fieldSet.readString("firstName"))
                    .lastName(fieldSet.readString("lastName"))
                    .birthdate(fieldSet.readString("birthdate"))
                    .build();
        }));
        lineMapper.afterPropertiesSet(); 
        reader.setLineMapper(lineMapper); 
        return reader; 
    }
}

StaxEventItemReader

從XML文件中讀取數據

@Configuration
public class XmlFileDemoJobConfiguration {
    @Autowired
    private JobBuilderFactory jobBuilderFactory; 
    @Autowired
    private StepBuilderFactory stepBuilderFactory; 
    @Autowired
    @Qualifier("xmlFileDemoWriter")
    private ItemWriter<? super Customer> xmlFileDemoWriter; 
    @Bean
    public Job xmlFileDemoJob(){
        return jobBuilderFactory.get("xmlFileDemoJob")
                .start(xmlFileDemoStep())
                .build(); 
    } 
    @Bean
    public Step xmlFileDemoStep() {
        return stepBuilderFactory.get("xmlFileDemoStep")
                .<Customer,Customer>chunk(10)
                .reader(xmlFileDemoReader())
                .writer(xmlFileDemoWriter)
                .build();
    } 
    @Bean
    @StepScope
    public StaxEventItemReader<Customer> xmlFileDemoReader() {
        StaxEventItemReader<Customer> reader = new StaxEventItemReader<>(); 
        reader.setResource(new ClassPathResource("customer.xml"));
        reader.setFragmentRootElementName("customer");  
        XStreamMarshaller unMarshaller = new XStreamMarshaller();
        Map<String,Class> map = new HashMap<>();
        map.put("customer",Customer.class);
        unMarshaller.setAliases(map);
        reader.setUnmarshaller(unMarshaller);  
        return reader; 
    }
}

MultiResourceItemReader

從多個文件讀取數據

@Configuration
public class MultipleFileDemoJobConfiguration {
    @Autowired
    private JobBuilderFactory jobBuilderFactory;
 
    @Autowired
    private StepBuilderFactory stepBuilderFactory;
 
    @Autowired
    @Qualifier("flatFileDemoWriter")
    private ItemWriter<? super Customer> flatFileDemoWriter;
 
    @Value("classpath*:/file*.csv")
    private Resource[] inputFiles;
 
    @Bean
    public Job multipleFileDemoJob(){
        return jobBuilderFactory.get("multipleFileDemoJob")
                .start(multipleFileDemoStep())
                .build(); 
    }
 
    @Bean
    public Step multipleFileDemoStep() {
        return stepBuilderFactory.get("multipleFileDemoStep")
                .<Customer,Customer>chunk(50)
                .reader(multipleResourceItemReader())
                .writer(flatFileDemoWriter)
                .build();
    }
 
    private MultiResourceItemReader<Customer> multipleResourceItemReader() { 
        MultiResourceItemReader<Customer> reader = new MultiResourceItemReader<>(); 
        reader.setDelegate(flatFileReader());
        reader.setResources(inputFiles); 
        return reader;
    }
 
    @Bean
    public FlatFileItemReader<Customer> flatFileReader() {
        FlatFileItemReader<Customer> reader = new FlatFileItemReader<>();
        reader.setResource(new ClassPathResource("customer.csv"));
       // reader.setLinesToSkip(1);
 
        DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
        tokenizer.setNames(new String[]{"id","firstName","lastName","birthdate"});
 
        DefaultLineMapper<Customer> lineMapper = new DefaultLineMapper<>();
        lineMapper.setLineTokenizer(tokenizer);
        lineMapper.setFieldSetMapper((fieldSet -> {
            return Customer.builder().id(fieldSet.readLong("id"))
                    .firstName(fieldSet.readString("firstName"))
                    .lastName(fieldSet.readString("lastName"))
                    .birthdate(fieldSet.readString("birthdate"))
                    .build();
        }));
        lineMapper.afterPropertiesSet(); 
        reader.setLineMapper(lineMapper); 
        return reader; 
    }
}

異常處理及重啟機制

對於chunk-oriented step,Spring Batch提供瞭管理狀態的工具。如何在一個步驟中管理狀態是通過ItemStream接口為開發人員提供訪問權限保持狀態的組件。這裡提到的這個組件是ExecutionContext實際上它是鍵值對的映射。map存儲特定步驟的狀態。該ExecutionContext使重啟步驟成為可能,因為狀態在JobRepository中持久存在。

執行期間出現錯誤時,最後一個狀態將更新為JobRepository。下次作業運行時,最後一個狀態將用於填充ExecutionContext然後

可以繼續從上次離開的地方開始運行。

檢查ItemStream接口:

將在步驟開始時調用open()並執行ExecutionContext;

用DB填充值; update()將在每個步驟或事務結束時調用,更新ExecutionContext;

完成所有數據塊後調用close();

下面我們構造個例子

準備個cvs文件,在第33條數據,添加一條錯誤名字信息 ;當讀取到這條數據時,拋出異常終止程序。

ItemReader測試代碼

 
@Component("restartDemoReader")
public class RestartDemoReader implements ItemStreamReader<Customer> {  
    private Long curLine = 0L;
    private boolean restart = false; 
    private FlatFileItemReader<Customer> reader = new FlatFileItemReader<>(); 
    private ExecutionContext executionContext;
    RestartDemoReader
    public () {
        
        reader.setResource(new ClassPathResource("restartDemo.csv")); 
        DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
        tokenizer.setNames(new String[]{"id", "firstName", "lastName", "birthdate"});
 
        DefaultLineMapper<Customer> lineMapper = new DefaultLineMapper<>();
        lineMapper.setLineTokenizer(tokenizer);
        lineMapper.setFieldSetMapper((fieldSet -> {
            return Customer.builder().id(fieldSet.readLong("id"))
                    .firstName(fieldSet.readString("firstName"))
                    .lastName(fieldSet.readString("lastName"))
                    .birthdate(fieldSet.readString("birthdate"))
                    .build();
        }));
        lineMapper.afterPropertiesSet(); 
        reader.setLineMapper(lineMapper);
    }
 
    @Override
    public Customer read() throws Exception, UnexpectedInputException, ParseException,
            NonTransientResourceException { 
        Customer customer = null; 
        this.curLine++;
        //如果是重啟,則從上一步讀取的行數繼續往下執行
        if (restart) {
            reader.setLinesToSkip(this.curLine.intValue()-1);
            restart = false;
            System.out.println("Start reading from line: " + this.curLine);
        }
 
        reader.open(this.executionContext); 
        customer = reader.read();
        //當匹配到wrongName時,顯示拋出異常,終止程序
        if (customer != null) {
            if (customer.getFirstName().equals("wrongName"))
                throw new RuntimeException("Something wrong. Customer id: " + customer.getId());
        } else {
            curLine--;
        }
        return customer;
    }
 
    /**
     * 判斷是否是重啟job
     * @param executionContext
     * @throws ItemStreamException
     */
    @Override
    public void open(ExecutionContext executionContext) throws ItemStreamException {
        this.executionContext = executionContext;
        if (executionContext.containsKey("curLine")) {
            this.curLine = executionContext.getLong("curLine");
            this.restart = true;
        } else {
            this.curLine = 0L;
            executionContext.put("curLine", this.curLine.intValue());
        } 
    }
 
    @Override
    public void update(ExecutionContext executionContext) throws ItemStreamException {
        System.out.println("update curLine: " + this.curLine);
        executionContext.put("curLine", this.curLine); 
    }
 
    @Override
    public void close() throws ItemStreamException { 
    }
}

Job配置

以10條記錄為一個批次,進行讀取

@Configuration
public class RestartDemoJobConfiguration {
    @Autowired
    private JobBuilderFactory jobBuilderFactory;
 
    @Autowired
    private StepBuilderFactory stepBuilderFactory;
 
    @Autowired
    @Qualifier("flatFileDemoWriter")
    private ItemWriter<? super Customer> flatFileDemoWriter;
 
    @Autowired
    @Qualifier("restartDemoReader")
    private ItemReader<Customer> restartDemoReader;
 
    @Bean
    public Job restartDemoJob(){
        return jobBuilderFactory.get("restartDemoJob")
                .start(restartDemoStep())
                .build(); 
    }
 
    @Bean
    public Step restartDemoStep() {
        return stepBuilderFactory.get("restartDemoStep")
                .<Customer,Customer>chunk(10)
                .reader(restartDemoReader)
                .writer(flatFileDemoWriter)
                .build();
    }
}

當我們第一次執行時,程序在33行拋出異常異常,curline值是30;

這時,我們可以查詢數據庫 batch_step_excution表,發現curline值已經以 鍵值對形式,持久化進數據庫(上文以10條數據為一個批次;故33條數據異常時,curline值為30)

接下來,我們更新wrongName,再次執行程序;

程序會執行open方法,判斷數據庫step中map是否存在curline,如果存在,則是重跑,即讀取curline,從該批次開始往下繼續執行;

以上為個人經驗,希望能給大傢一個參考,也希望大傢多多支持WalkonNet。

推薦閱讀: