1
votes
@Qualifier("billingJob")
@Bean
public Job billingJob(@Qualifier("fetchAgreementTasklet")Step fetchAgreementTasklet,
@Qualifier("fetchAgreementRecurringItemsTasklet") Step fetchAgreementRecurringItemsTasklet,
@Qualifier("fetchItemsHistoryTasklet") Step fetchItemsHistoryTasklet ,
@Qualifier("populateAgreementStep") Step populateAgreementStep,
@Qualifier("populateRecurringItemStep") Step populateRecurringItemStep ,
@Qualifier("populateRecurringItemHistoryStep") Step populateRecurringItemHistoryStep ) {

  return jobsBuilderFactory.get("billingJob")                                
           .incrementer(new RunIdIncrementer())
           .flow(fetchAgreementTasklet)
           .next(fetchAgreementRecurringItemsTasklet)
           .next(fetchItemsHistoryTasklet)

           .next(populateAgreementStep).next(populateRecurringItemStep)
           .next(populateRecurringItemHistoryStep)
           .end().build();
}

    @Qualifier("populateRecurringItemStep")
    @Bean
    public Step populateRecurringItemStep(
    @Qualifier("recurringItemReader") ItemReader<RecurringItemRaw> recurringItemReader,
    @Qualifier("recurringItemProcessor") ItemProcessor<RecurringItemRaw, RecurringItem> recurringItemProcessor,
    @Qualifier("recurringItemWriter") ItemWriter<RecurringItem> recurringItemWriter) {
       return stepBuilderFactory.get("populateRecurringItemStep")
        .<RecurringItemRaw, RecurringItem> chunk(10)
        .reader(recurringItemReader).processor(recurringItemProcessor)
        .writer(recurringItemWriter).build();
    } 

@Qualifier("recurringItemReader")
@Bean
public ItemReader<RecurringItemRaw> recurringItemReader() {
FlatFileItemReader<RecurringItemRaw> reader = new FlatFileItemReader<RecurringItemRaw>();
String file = salesforceConfiguration       .getFileLocation(SalesforceConfiguration.TYPE_AGREEMENT_ITEMS);
    LOG.info("::::::: Reading RecurringItem File ::::::: " + file);
    reader.setResource(new FileSystemResource(file));
    reader.setLinesToSkip(1);
    reader.setLineMapper(new DefaultLineMapper<RecurringItemRaw>() {
        {
            setLineTokenizer(new DelimitedLineTokenizer() {
                {
                    setNames(new String[] { "id", "name", "agreementId",
                            "cost", "quantity" });
                }
            });
            setFieldSetMapper(new BeanWrapperFieldSetMapper<RecurringItemRaw>() {
                {
                    setTargetType(RecurringItemRaw.class);
                }
            });
        }
    });
    return reader;
}

@Qualifier("recurringItemProcessor")
@Bean
public ItemProcessor<RecurringItemRaw, RecurringItem> recurringItemprocessor() {
return new RecurringItemProcessor();
}

@Qualifier("recurringItemWriter")
@Bean
public ItemWriter<RecurringItem> recurringItemWriter(DataSource dataSource) {
JdbcBatchItemWriter<RecurringItem> writer = new JdbcBatchItemWriter<RecurringItem>();
writer.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider<RecurringItem>());
writer.setSql(PostgresqlDBQuery.INSERT_RECURRING_ITEM_QRY);
writer.setDataSource(dataSource);
return writer;
}

recurringItemprocessor() is used to convert the string values to other formats

recurringItemWriter() is used to insert record into my postgresql database

recurringItemReader() is used to read the csv file.


This is the processor class public class RecurringItemProcessor implements ItemProcessor {

private static final Logger LOG = Logger.getLogger(RecurringItemProcessor.class.getName());

@Override
public RecurringItem process(final RecurringItemRaw recurringItemRaw) throws Exception {
    LOG.info("Processing recurring item");
    final RecurringItem item = new RecurringItem(); 
    item.setId(recurringItemRaw.getId());
    item.setName(recurringItemRaw.getName());
    item.setAgreementId(recurringItemRaw.getAgreementId());
    if (recurringItemRaw.getCost().trim().isEmpty()){
        item.setCost(BigDecimal.ZERO);
    }
    else{
        item.setCost(new BigDecimal(recurringItemRaw.getCost())); 
    }
    if (recurringItemRaw.getQuantity().trim().isEmpty()){
        item.setQuantity(0);
    }else{
        item.setQuantity(new BigDecimal(recurringItemRaw.getQuantity()).intValue());
    }
    LOG.info(item.toString());
    return item;
}

}

I'm having issue with "populateRecurringItemStep" where this step running in a endloop. How to fix this issue .. I'm using the spring boot no xml configurations ..

2
Can you add also populateRecurringItemStep configuration or component? - Nenad Bozic
I have added the code - Charitha
Still nothing that rings the bell. Guess you can post full configuration and also problematic reader, processor and writer. - Nenad Bozic
Please have a look now i edit the code - Charitha
I'm reading item by item so it should return the processed item . If i return null instead of the object there will be no items saving in the database. In this case it is not reading the same line over and over, it repeats all the items so i want to end once all the items are read.. - Charitha

2 Answers

6
votes

Instead of using flow(), use start(), as following :

return jobsBuilderFactory.get("billingJob")                                
           .incrementer(new RunIdIncrementer())
           .start(fetchAgreementTasklet)
           .next(fetchAgreementRecurringItemsTasklet)
           .next(fetchItemsHistoryTasklet)

           .next(populateAgreementStep).next(populateRecurringItemStep)
           .next(populateRecurringItemHistoryStep)
           .end().build();
}
1
votes

Try to do something like this:

    @Bean
    public Step stepOne(){
        return steps.get("stepOne")
                .tasklet(new MyTaskOne())
                .build();
    }

    @Bean
    public Step stepTwo(){
        return steps.get("stepTwo")
                .tasklet(new MyTaskTwo())
                .build();
    } 

    @Bean
    public Job demoJob(){
        return jobs.get("demoJob")
                .incrementer(new RunIdIncrementer())
                .start(stepOne())
                .next(stepTwo())
                .build();
    }