0
votes

I'm curious how one would manage to pass all available data from the reader down through the pipeline.

e.g. I want the reader to pull all the data in and pass the entire result set down to the processor and the writer. The result set is small, I'm not worried about resources. I thought I had implemented this properly by having all of the components (reader, writer, processor) receive and return a collection of the processed item.

While the results of the process appears to be fine, what I am seeing is that the job is reading everything in, passing it down through the pipeline and then it returns to the reader, reads everything and passes it down and so on.

I've considered creating an extra step to read all the data in and pass it down to a subsequent step, but I'm curious if I can do this and how

The job looks like

@Bean
Job job() throws Exception {
    return jobs.get("job").start(step1()).build()
}
@Bean
protected Step step1() throws Exception {
    return steps.get("step1").chunk(10)
    .reader(reader()
    .processor(processor()
    .writer(writer()).build()

//....

The reader, processor and writer accept and return a List, e.g.

class DomainItemProcessor implements ItemProcessor<List<Domain>, List<Domain>>{
3
Are you sure you need to do it this way? Do you need to process a logical group of Domain objects at a time? If you do this, I believe you'll need to have List<Domain> as your type parameter for everything. ItemReader<List<Domain>> and ItemWriter<List<Domain>>. You would likely need a custom ItemReader and ItemWriter to handle this.Jared Gommels
Not sure if I understand your question correctly: What you want is to construct something (a list of domain in your case) and share across multiple steps? Seems that in Spring Batch 3 you can do it by using JobScope (which you may want to have a first step that always run, and construct the job scope list). I used another way to solve when I was using Spring Batch 1 (I believe may not applicable to you, which we run every job in its own separate child context)Adrian Shum
I have a custom reader and writer. I have to combine the results of two distinct data sources into a single domain object. I can't have two readers, that I know of, in a single step, so I'm creating two data sources in batchconfiguration and passing them both into the reader, merging and returning a collection of complete domain objectsnbpeth
@adrian I have only a single step, I want to do all of my reading one time and then pass the resulting collection to the processor, process the collection in its entirety and pass that to the writer. No batches, one swoopnbpeth
Just wonder, given such kind of un-intuitive reader logic, why don't you just write a simple tasklet instead? Or, there is no reason why you cannot write your own reader which read from 2 data sources and construct the item with your own logicAdrian Shum

3 Answers

5
votes

You could also implement it as a tasklet. Since you want to process all data at once, you do not really have batch-processing and therefore, the whole restart and failurehandling of a "normal" springbatch step will not be used at all.

A tasklet like this could look as follows in pseudocode:

@Component
public class MyTasklet implements Tasklet {

    @Autowired
    private ItemReader<YourType> readerSpringBeanName;

    @Autowired
    private ItemProcessor<List<YourType>,List<YourType>> processorSpringBeanName;

    @Autwired
    private ItemWriter<List<YourType>> writerSpringBeanName;


    RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) {
        readerSpringBeanName.open(new ExecutionContext());
        writerSpringBeanName.open(new ExecutionContext());

        List<YourType> items = new ArrayList<>();
        YourType readItem = readerSpringBeanName.read();
        while(readItem != null) {
             items.add(readItem);
             readItem = readerSpringBeanName.read();
        }

        writerSpringBeanName.write(processorSpringBeanName.process(items));

        readerSpringBeanName.close();
        writerSpringBeanName.close();
        return RepeatStatus.FINISHED;
    }
}

Moreover, depending on your usecase, there is probably not even the need to define a spring-batch job at all.

0
votes

High Level Design for this case will be

  1. Reader will be a custom reader. It will return List or a wrapper which contains a list of Domain objects. The reader will inject a DAO bean which to perform a query and retrieve a list of Domain.

public class DomainList { private List domains;

  // get/set

}

public class DomainReader implements ItemReader {

@Autowire
private DomainDAO domainDAO;

private List<Domain> domains;

@Override
public DomainList read() throws Exception {
    if (this.domains == null) {
        // TODO: please replace with your business logic.
        this.domains = this.domainDAO.getListofDomains();
        return this.domains;
    }
    else {
        return null;   // to tell Spring Batch the reader has done.
    }
}

}

  1. Processor and Writer will take DomainList as Input.

Note: Above is pseudocode code.

Thanks, Nghia

0
votes

Ok, this might be a little too late. But here is my take on the implementation Yes you could make use of itemreader, itemprocessor and itemwriter to do it. It maybe a little overkill, but nevertheless it could be done

The main issue (since the job keeps coming back to the reader) I see is there should have been a way to tell spring that all items have been read from the Itemreader and there are no more objects to read. To do that you have explicitly return a null when spring tries to read more objects.

So this is an example returning List from ItemReader Here the read() method should have a similar implementation

Leave out the Redis implementation , but here is the gist of it, I declare a variable called -

iterateindex

Have the iterateIndex created and initialzied at the start of the Item reader like this I have also included the redisson cache to store the list. Again that can be negated

    public class XXXConfigItemReader implements 
      ItemStreamReader<FeedbackConfigResponseModel> {

    private int iterateIndex;

    @Autowired
    Environment env;

    @Autowired
    RestTemplateBuilder templateBuilder;



    public DeferralConfigItemReader() {
        this.iterateIndex = 0;

    }

and make sure that the read() returns null when it reaches the list size

public List<FeedbackConfigResponseModel> read()
            throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
        // TODO Auto-generated method stub
        // Get the config details from db



        List<XXX> feedbackConfigModelList = new ArrayList<>;
            // store all the values from the db or read from a file , read
            //it line by line and marshall that to a list
           // now on the first itemreader call, the iterateindex will not be 
           // equal to the list size and hence the entire list is returned 
           // in the first call  

        if (feedbackConfigModelList == null || this.iterateIndex == feedbackConfigModelList.size()) {
            return null;
        } else {
            // and now we equate the list size and store it in iterateIndex
            // the second call will return null.
            this.iterateIndex = feedbackConfigModelList.size();

            return feedbackConfigModelList;
        }

    }

Hope it helps people who are getting the same issue.

EDIT: Showing how restTemplateBuilder is being used. note instead of RestTemplateBuilder you could jut autowire the RestTemplate. I made use of restTemplateBuilder to have some additionalConfig for my prj needs

Now this is the complete itemreader implemented using itemstreamreader interface

  public class XXXX implements ItemStreamReader<FeedbackConfigResponseModel> {

private int iterateIndex;

@Autowired
Environment env;

@Autowired
RestTemplateBuilder templateBuilder;

@Autowired
RedissonClient redisClient;

public DeferralConfigItemReader() {
    this.iterateIndex = -1;
    this.feedbackConfigModelList = new ArrayList<>();
}

@Override
public void open(ExecutionContext executionContext) throws ItemStreamException {
    // TODO Auto-generated method stub

}

@Override
public void update(ExecutionContext executionContext) throws ItemStreamException {
    // TODO Auto-generated method stub

}

@Override
public void close() throws ItemStreamException {
    // TODO Auto-generated method stub

}


@Override
public FeedbackConfigResponseModel read()
        throws Exception, UnexpectedInputException, ParseException, NonTransientResourceException {
    // TODO Auto-generated method stub
    String feedbackConfigFetchUrl = null;
    ResponseEntity<FeedbackConfigResponseListModel> respModelEntity = null;
    // if the cache is empty then fetch it from resttemplate
    RList<FeedbackConfigResponseModel> rList = redisClient.getList(AppConstants.CACHE_DBCONFIG_LIST);
    List<FeedbackConfigResponseModel> feedbackConfigModelList = new ArrayList<>();
    FeedbackConfigResponseModel firstDbItem = rList.get(0);
    if (firstDbItem == null) {
        feedbackConfigFetchUrl = this.env.getProperty("restTemplate.default.url") + "/test";
        respModelEntity = templateBuilder.build().getForEntity(feedbackConfigFetchUrl,
                FeedbackConfigResponseListModel.class);
        System.out.println("Response Model from template:" + respModelEntity.getBody());
        feedbackConfigModelList = respModelEntity.getBody() == null ? null
                : respModelEntity.getBody().getFeedbackResponseList();
        rList.addAll(feedbackConfigModelList);
    } else {
        System.out.println("coming inside else");
        feedbackConfigModelList = rList;
    }

    if (feedbackConfigModelList == null || this.iterateIndex == feedbackConfigModelList.size()) {
        return null;
    } else {

        this.iterateIndex++;
        System.out.println("itenrating index"+iterateIndex + feedbackConfigModelList.size());
        return feedbackConfigModelList.get(iterateIndex);
    }

}

}