0
votes

I have custom reader to read data from CSV File.

    package org.kp.oppr.remediation.batch.csv;

import java.util.Arrays;
import java.util.LinkedHashMap;
import java.util.Map;

import org.apache.commons.lang.StringUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.remediation.batch.csv.FlatFileItemReaderNewLine;
import org.remediation.batch.model.RawItem;
import org.remediation.batch.model.RawItemLineMapper;
import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.StepExecution;
import org.springframework.batch.core.StepExecutionListener;
import org.springframework.batch.core.annotation.BeforeStep;
import org.springframework.batch.item.file.LineCallbackHandler;
import org.springframework.batch.item.file.LineMapper;
import org.springframework.batch.item.file.mapping.DefaultLineMapper;
import org.springframework.batch.item.file.mapping.FieldSetMapper;
import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
import org.springframework.batch.item.file.transform.FieldSet;
import org.springframework.batch.item.file.transform.LineTokenizer;
import org.springframework.core.io.Resource;
import org.springframework.util.Assert;
import org.springframework.validation.BindException;

public class RawItemCsvReader extends MultiResourceItemReader<RawItem>
        implements StepExecutionListener, LineCallbackHandler,
        FieldSetMapper<RawItem> {

    static final Logger LOGGER = LogManager.getLogger(RawItemCsvReader.class);
    final private String COLUMN_NAMES_KEY = "COLUMNS_NAMES_KEY";
    private StepExecution stepExecution;
    private DefaultLineMapper<RawItem> lineMapper;
    private String[] columnNames;
    private Resource[] resources;
// = DelimitedLineTokenizer.DELIMITER_COMMA;
    private char quoteCharacter = DelimitedLineTokenizer.DEFAULT_QUOTE_CHARACTER;
    private String delimiter;


    public RawItemCsvReader() {
        setLinesToSkip(0);
        setSkippedLinesCallback(this);
    }

    @Override
    public void afterPropertiesSet() {
        // not in constructor to ensure we invoke the override
        final DefaultLineMapper<RawItem> lineMapper = new RawItemLineMapper();
        setLineMapper(lineMapper);
    }

    /**
     * Satisfies {@link LineCallbackHandler} contract and and Acts as the
     * {@code skippedLinesCallback}.
     * 
     * @param line
     */
    @Override
    public void handleLine(String line) {
        getLineMapper().setLineTokenizer(getTokenizer());
        getLineMapper().setFieldSetMapper(this);
    }

    private LineTokenizer getTokenizer() {

        // this.columnNames = line.split(delimiter);
        DelimitedLineTokenizer lineTokenizer = new DelimitedLineTokenizer();
        lineTokenizer.setQuoteCharacter(quoteCharacter);
        lineTokenizer.setDelimiter(delimiter);
        lineTokenizer.setStrict(true);
        lineTokenizer.setNames(columnNames);
        addColumnNames();
        return lineTokenizer;
    }

    private void addColumnNames() {
        stepExecution.getExecutionContext().put(COLUMN_NAMES_KEY, columnNames);
    }


    @Override
    public void setResources(Resource[] resources) {

        this.resources = resources;
        super.setResources(resources);

    }



    /**
     * Provides acces to an otherwise hidden field in parent class. We need this
     * because we have to reconfigure the {@link LineMapper} based on file
     * contents.
     * 
     * @param lineMapper
     */
    @Override
    public void setLineMapper(LineMapper<RawItem> lineMapper) {
        if (!(lineMapper instanceof DefaultLineMapper)) {
            throw new IllegalArgumentException(
                    "Must specify a DefaultLineMapper");
        }
        this.lineMapper = (DefaultLineMapper) lineMapper;

        super.setLineMapper(lineMapper);
    }

    private DefaultLineMapper getLineMapper() {
        return this.lineMapper;
    }

    /**
     * Satisfies {@link FieldSetMapper} contract.
     * 
     * @param fs
     * @return
     * @throws BindException
     */
    @Override
    public RawItem mapFieldSet(FieldSet fs) throws BindException {
        if (fs == null) {
            return null;
        }
        Map<String, String> record = new LinkedHashMap<String, String>();
        for (String columnName : this.columnNames) {
            record.put(columnName,
                    StringUtils.trimToNull(fs.readString(columnName)));
        }
        RawItem item = new RawItem();
        item.setResource(resources);
        item.setRecord(record);
        return item;
    }

    @BeforeStep
    public void saveStepExecution(StepExecution stepExecution) {
        this.stepExecution = stepExecution;
    }

    @Override
    public void beforeStep(StepExecution stepExecution) {
        //LOGGER.info("Start Raw Read Step for " + itemResource.getFilename());

    }

    @Override
    public ExitStatus afterStep(StepExecution stepExecution) {
        LOGGER.info("End Raw Read Step for lines read: " + stepExecution.getReadCount()
                + " lines skipped: " + stepExecution.getReadSkipCount());

        /*
        LOGGER.info("End Raw Read Step for " + itemResource.getFilename()
                + " lines read: " + stepExecution.getReadCount()
                + " lines skipped: " + stepExecution.getReadSkipCount());
                */
        return ExitStatus.COMPLETED;
    }

    public void setDelimiter(String delimiter) {
        this.delimiter = delimiter;
    }

    public void setQuoteCharacter(char quoteCharacter) {
        this.quoteCharacter = quoteCharacter;
    }

    public String[] getColumnNames() {
        return columnNames;
    }

    public void setColumnNames(String[] columnNames) {
        this.columnNames = columnNames;
    }

    public String getDelimiter() {
        return delimiter;
    }

}

I want to use MultiResourceItemReader along with this class to read multiple files with the same extension. I am using the Spring MultiResourceItemReader to do the job. I need to know how to configure private ResourceAwareItemReaderItemStream delegate; instance for this class

package org.kp.oppr.remediation.batch.csv;

import java.util.Arrays;
import java.util.Comparator;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.springframework.batch.item.ExecutionContext;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemStream;
import org.springframework.batch.item.ItemStreamException;
import org.springframework.batch.item.ParseException;
import org.springframework.batch.item.UnexpectedInputException;
import org.springframework.batch.item.file.LineCallbackHandler;
import org.springframework.batch.item.file.LineMapper;
import org.springframework.batch.item.file.ResourceAwareItemReaderItemStream;
import org.springframework.batch.item.util.ExecutionContextUserSupport;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.core.io.Resource;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;

public class MultiResourceItemReader <T> implements ItemReader<T>, ItemStream, InitializingBean,ResourceAwareItemReaderItemStream<T> {

    static final Logger LOGGER = LogManager
            .getLogger(MultipleFlatFileItemReaderNewLine.class);

    private final ExecutionContextUserSupport executionContextUserSupport = new ExecutionContextUserSupport();

    private ResourceAwareItemReaderItemStream<? extends T> delegate;

    private Resource[] resources;

    private MultiResourceIndex index = new MultiResourceIndex();

    private boolean saveState = true;

    // signals there are no resources to read -> just return null on first read
    private boolean noInput;

    private LineMapper<T> lineMapper;

    private int linesToSkip = 0;

    private LineCallbackHandler skippedLinesCallback;

    private Comparator<Resource> comparator = new Comparator<Resource>() {

        /**
         * Compares resource filenames.
        */
       public int compare(Resource r1, Resource r2) {
           return r1.getFilename().compareTo(r2.getFilename());
       }

   };

    public MultiResourceItemReader() {
        executionContextUserSupport.setName(ClassUtils.getShortName(MultiResourceItemReader.class));
    }

    /**
     * @param skippedLinesCallback
     *            will be called for each one of the initial skipped lines
     *            before any items are read.
     */
    public void setSkippedLinesCallback(LineCallbackHandler skippedLinesCallback) {
        this.skippedLinesCallback = skippedLinesCallback;
    }

    /**
     * Public setter for the number of lines to skip at the start of a file. Can
     * be used if the file contains a header without useful (column name)
     * information, and without a comment delimiter at the beginning of the
     * lines.
     * 
     * @param linesToSkip
     *            the number of lines to skip
     */
    public void setLinesToSkip(int linesToSkip) {
        this.linesToSkip = linesToSkip;
    }

    /**
     * Setter for line mapper. This property is required to be set.
     * 
     * @param lineMapper
     *            maps line to item
     */
    public void setLineMapper(LineMapper<T> lineMapper) {
        this.lineMapper = lineMapper;
    }

    /**
     * Reads the next item, jumping to next resource if necessary.
     */
   public T read() throws Exception, UnexpectedInputException, ParseException {

        if (noInput) {
            return null;
       }

        T item;
        item = readNextItem();
        index.incrementItemCount();

       return item;
    }

    /**
     * Use the delegate to read the next item, jump to next resource if current
    * one is exhausted. Items are appended to the buffer.
    * @return next item from input
    */
  private T readNextItem() throws Exception {

       T item = delegate.read();

       while (item == null) {

           index.incrementResourceCount();

           if (index.currentResource >= resources.length) {
               return null;
           }

           delegate.close();
           delegate.setResource(resources[index.currentResource]);
           delegate.open(new ExecutionContext());

           item = delegate.read();
       }

       return item;
   }

   /**
    * Close the {@link #setDelegate(ResourceAwareItemReaderItemStream)} reader
    * and reset instance variable values.
    */
   public void close() throws ItemStreamException {
       index = new MultiResourceIndex();
       delegate.close();
       noInput = false;
   }

   /**
    * Figure out which resource to start with in case of restart, open the
    * delegate and restore delegate's position in the resource.
    */
   public void open(ExecutionContext executionContext) throws ItemStreamException {

       Assert.notNull(resources, "Resources must be set");

       noInput = false;
      if (resources.length == 0) {
          LOGGER.warn("No resources to read");
           noInput = true;
           return;
       }

       Arrays.sort(resources, comparator);
    for(int i =0; i < resources.length; i++)
    {
        LOGGER.info("Resources after Sorting" + resources[i]);  
    }


       index.open(executionContext);

       delegate.setResource(resources[index.currentResource]);

      delegate.open(new ExecutionContext());

       try {
           for (int i = 0; i < index.currentItem; i++) {
               delegate.read();
           }
      }
       catch (Exception e) {
           throw new ItemStreamException("Could not restore position on restart", e);
       }
   }

   /**
    * Store the current resource index and position in the resource.
    */
   public void update(ExecutionContext executionContext) throws ItemStreamException {
       if (saveState) {
           index.update(executionContext);
       }
  }

   /**
    * @param delegate reads items from single {@link Resource}.
   */
   public void setDelegate(ResourceAwareItemReaderItemStream<? extends T> delegate) {
       this.delegate = delegate;
   }

   /**
   * Set the boolean indicating whether or not state should be saved in the
    * provided {@link ExecutionContext} during the {@link ItemStream} call to
    * update.
    * 
    * @param saveState
   */
   public void setSaveState(boolean saveState) {
       this.saveState = saveState;
   }

   /**
    * @param comparator used to order the injected resources, by default
    * compares {@link Resource#getFilename()} values.
    */
   public void setComparator(Comparator<Resource> comparator) {
       this.comparator = comparator;
   }

   /**
    * @param resources input resources
    */
   public void setResources(Resource[] resources) {
       this.resources = resources;
   }

   /**
    * Facilitates keeping track of the position within multi-resource input.
    */
   private class MultiResourceIndex {

       private static final String RESOURCE_KEY = "resourceIndex";

       private static final String ITEM_KEY = "itemIndex";

       private int currentResource = 0;

       private int markedResource = 0;

       private int currentItem = 0;

       private int markedItem = 0;

      public void incrementItemCount() {
           currentItem++;
       }

      public void incrementResourceCount() {
           currentResource++;
           currentItem = 0;
       }

       public void mark() {
           markedResource = currentResource;
           markedItem = currentItem;
       }

       public void reset() {
           currentResource = markedResource;
           currentItem = markedItem;
       }

       public void open(ExecutionContext ctx) {
           if (ctx.containsKey(executionContextUserSupport.getKey(RESOURCE_KEY))) {
               currentResource = ctx.getInt(executionContextUserSupport.getKey(RESOURCE_KEY));
           }

           if (ctx.containsKey(executionContextUserSupport.getKey(ITEM_KEY))) {
               currentItem = ctx.getInt(executionContextUserSupport.getKey(ITEM_KEY));
           }
       }

       public void update(ExecutionContext ctx) {
           ctx.putInt(executionContextUserSupport.getKey(RESOURCE_KEY), index.currentResource);
           ctx.putInt(executionContextUserSupport.getKey(ITEM_KEY), index.currentItem);
       }
   }

@Override
public void afterPropertiesSet() throws Exception {
    // TODO Auto-generated method stub

}

@Override
public void setResource(Resource resource) {
    // TODO Auto-generated method stub

}

}

Configuration Files for Spring is :

<batch:step id="readFromCSVFileAndUploadToDB" next="stepMovePdwFile">
            <batch:tasklet transaction-manager="transactionManager">
                <batch:chunk reader="multiResourceReader" writer="rawItemDatabaseWriter"
                    commit-interval="500" skip-policy="pdwUploadSkipPolicy" />
            </batch:tasklet>
        </batch:step>

<bean id="multiResourceReader"
        class="org.springframework.batch.item.file.MultiResourceItemReader" scope="step">
        <property name="resource" value="file:#{jobParameters[filePath]}/*.dat" />
        <property name="delegate" ref="rawItemCsvReader"></property>
    </bean>


    <bean id="rawItemCsvReader" class="org.kp.oppr.remediation.batch.csv.RawItemCsvReader"
        scope="step">
        <property name="resources" value="file:#{jobParameters[filePath]}/*.dat" />
        <property name="columnNames" value="${columnNames}" />
        <property name="delimiter" value="${delimiter}" />
    </bean>
1

1 Answers

0
votes

Use a standard FlatFileItemReader (properly configured via XML) instead of your RawItemCsvReader as delegate.
This solution will answer your question because FlatFileItemReader implements AbstractItemStreamItemReader.
Remember: SB is heavly based on delegation; write a class like your reader is rarely requested.