7
votes

I’m investigating the use of spring batch to process records from an encoded zipped file. The records are variable length with nested variable length data fields encoded within them.

I’m new to Spring and Spring Batch, this is how I plan to structure the batch configuration.

  • The ItemReader would need to read a single record from the zipped (*.gz) file input stream into a POJO (byte array), the length of this record would be contained in the first two bytes of the stream.
  • The ItemProcessor will decode the byte array and store info in relevant attributes in the POJO.
  • The ItemWriter would populate a database.

My initial problem is understanding how to set up the ItemReader, I’ve looked at some of the examples of using a FlatFileItemReader, but my difficulty is the expectation to have a Line Mapper. I don't see how I can do that in my case (no concept of a line in the file).

There are some articles indicating the use of a custom BufferedReaderFactory, but great to see a worked example of this.

Help would be appreciated.

4
can you add examples of records from the decoded file?Michael Pralow
Sorry for the delay replying, my confusion was based around the file handling in the custom ItemReader, if I was to open and process the file in the read() method, I would have to keep track of where I was in the file etc. I managed to tackle this by creating a BufferedInputStream (BufferedInputStream(new GZIPInputStream(new FileInputStream(file)) in the constructor of the custom ItemReader, then process that stream in the read() method with each iteration of the step.Hugh Lacey

4 Answers

3
votes

if the gzipped file is a simple txt file, you only need a custum BufferedReaderFactory, the linemaper then gets the String of the current line

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.List;
import java.util.zip.GZIPInputStream;
import org.springframework.batch.item.file.BufferedReaderFactory;
import org.springframework.core.io.Resource;

public class GZipBufferedReaderFactory implements BufferedReaderFactory {

    /** Default value for gzip suffixes. */
    private List<String> gzipSuffixes = new ArrayList<String>() {

        {
            add(".gz");
            add(".gzip");
        }
    };

    /**
     * Creates Bufferedreader for gzip Resource, handles normal resources
     * too.
     * 
     * @param resource
     * @param encoding
     * @return
     * @throws UnsupportedEncodingException
     * @throws IOException 
     */
    @Override
    public BufferedReader create(Resource resource, String encoding)
            throws UnsupportedEncodingException, IOException {
        for (String suffix : gzipSuffixes) {
            // test for filename and description, description is used when 
            // handling itemStreamResources
            if (resource.getFilename().endsWith(suffix)
                    || resource.getDescription().endsWith(suffix)) {
                return new BufferedReader(new InputStreamReader(new GZIPInputStream(resource.getInputStream()), encoding));
            }
        }
        return new BufferedReader(new InputStreamReader(resource.getInputStream(), encoding));
    }

    public List<String> getGzipSuffixes() {
        return gzipSuffixes;
    }

    public void setGzipSuffixes(List<String> gzipSuffixes) {
        this.gzipSuffixes = gzipSuffixes;
    }
}

simple itemreader configuration:

 <bean id="itemReader" class="org.springframework.batch.item.file.FlatFileItemReader" scope="step">
  <property name="resource" value="#{jobParameters['input.file']}" />
  <property name="lineMapper">
    <bean class="org.springframework.batch.item.file.mapping.PassThroughLineMapper" />
  </property>
  <property name="strict" value="true" />
  <property name="bufferedReaderFactory">
    <bean class="your.custom.GZipBufferedReaderFactory" />
  </property>
</bean>
2
votes

From the feature request ticket to spring batch (https://jira.spring.io/browse/BATCH-1750):

public class GZIPResource extends InputStreamResource implements Resource {

    public GZIPResource(Resource delegate) throws IOException {
        super(new GZIPInputStream(delegate.getInputStream()));
    }
}

The custom GZipBufferedReaderFactory won't work with other than FlatFileItemReader.

Edit: lazy version. This doesn't try to open the file until getInputStream is called. This avoids exceptions due to that the file doesn't exist if you create the Resource at the program initialization (e.g. with autowiring).

public class GzipLazyResource extends FileSystemResource implements Resource {

    public GzipLazyResource(File file) {
        super(file);
    }

    public GzipLazyResource(String path) {
        super(path);
    }

    @Override
    public InputStream getInputStream() throws IOException {
        return new GZIPInputStream(super.getInputStream());
    }
}

Edit2: this only works for input Resources

Adding another similar method getOutputStream won't work because spring uses the FileSystemResource.getFile, not the FileSystemResource.getOutputStream.

2
votes

Tested that this simple configuration of reading lines from a zipped & encoded file in S3 works.

Key points:

  • Implement a BufferedReaderFactory that uses Apache's GZIPInputStreamFactory, and set that as the bufferedReaderFactory on the FlatFileItemReader.
  • Configure a SimpleStorageResourceLoader from Spring Cloud with an AmazonS3Client, and use it to get the zipped flat file in S3. Set that as the resource on the FlatFileItemReader.

Note: reading into a string can be easily replaced by reading into a POJO.

GZIPBufferedReaderFactory.java

Using Apache's GZIPInputStreamFactory

public class GZIPBufferedReaderFactory implements BufferedReaderFactory {

    private final GZIPInputStreamFactory gzipInputStreamFactory;

    public GZIPBufferedReaderFactory(GZIPInputStreamFactory gzipInputStreamFactory) {
        this.gzipInputStreamFactory = gzipInputStreamFactory;
    }

    @Override
    public BufferedReader create(Resource resource, String encoding) throws IOException {
        return new BufferedReader(new InputStreamReader(gzipInputStreamFactory.create(resource.getInputStream()), encoding));
    }
}

AWSConfiguration.java

@Configuration
public class AWSConfiguration {

    @Bean
    public AmazonS3Client s3Client(AWSCredentialsProvider credentials, Region region) {
        ClientConfiguration clientConfig = new ClientConfiguration();

        AmazonS3Client client = new AmazonS3Client(credentials, clientConfig);
        client.setRegion(region);
        return client;
    }
}

How you configure the AWSCredentialsProvider and Region beans can vary and I will not detail that here since there is documentation elsewhere.

BatchConfiguration.java

@Configuration
@EnableBatchProcessing
public class SignalsIndexBatchConfiguration {

    @Autowired
    public AmazonS3Client s3Client;

    @Bean
    public GZIPInputStreamFactory gzipInputStreamFactory() {
        return new GZIPInputStreamFactory();
    }

    @Bean
    public GZIPBufferedReaderFactory gzipBufferedReaderFactory(GZIPInputStreamFactory gzipInputStreamFactory) {
        return new GZIPBufferedReaderFactory(gzipInputStreamFactory);
    }

    @Bean
    public SimpleStorageResourceLoader simpleStorageResourceLoader() {
        return new SimpleStorageResourceLoader(s3Client);
    }

    @Bean
    @StepScope
    protected FlatFileItemReader<String> itemReader(
            SimpleStorageResourceLoader simpleStorageResourceLoader,
            GZIPBufferedReaderFactory gzipBufferedReaderFactory) {
        FlatFileItemReader<String> flatFileItemReader = new FlatFileItemReader<>();
        flatFileItemReader.setBufferedReaderFactory(gzipBufferedReaderFactory);
        flatFileItemReader.setResource(simpleStorageResourceLoader.getResource("s3://YOUR_FLAT_FILE.csv"));
        flatFileItemReader.setLineMapper(new PassThroughLineMapper());
        return flatFileItemReader;
    }

    @Bean
    public Job job(Step step) {
        return jobBuilderFactory.get("job").start(step).build();
    }

    @Bean
    protected Step step(GZIPInputStreamFactory gzipInputStreamFactory) {
        return stepBuilderFactory.get("step")
                .<String, String> chunk(200)
                .reader(itemReader(simpleStorageResourceLoader(), gzipBufferedReaderFactory(gzipInputStreamFactory)))
                .processor(itemProcessor())
                .faultTolerant()
                .build();
    }

    /*
     * These components are some of what we
     * get for free with the @EnableBatchProcessing annotation
     */
    @Autowired
    public JobBuilderFactory jobBuilderFactory;

    @Autowired
    public StepBuilderFactory stepBuilderFactory;

    @Autowired
    public JobRepository jobRepository;
    /*
     * END Freebies
     */

    @Bean
    public JobLauncher jobLauncher() throws Exception {
        SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
        jobLauncher.setJobRepository(jobRepository);
        jobLauncher.setTaskExecutor(new SimpleAsyncTaskExecutor());
        jobLauncher.afterPropertiesSet();
        return jobLauncher;
    }
}
0
votes

My confusion was based around the file handling in the custom ItemReader, if I was to open and process the file in the read() method, I would have to keep track of where I was in the file etc. I managed to tackle this by creating a BufferedInputStream (BufferedInputStream(new GZIPInputStream(new FileInputStream(file)) in the constructor of the custom ItemReader, then process that stream in the read() method with each iteration of the step.