Tested that this simple configuration of reading lines from a zipped & encoded file in S3 works.
Key points:
- Implement a
BufferedReaderFactory
that uses Apache's GZIPInputStreamFactory
, and set that as the bufferedReaderFactory on the FlatFileItemReader
.
- Configure a
SimpleStorageResourceLoader
from Spring Cloud with an AmazonS3Client
, and use it to get the zipped flat file in S3. Set that as the resource on the FlatFileItemReader
.
Note: reading into a string can be easily replaced by reading into a POJO.
GZIPBufferedReaderFactory.java
Using Apache's GZIPInputStreamFactory
public class GZIPBufferedReaderFactory implements BufferedReaderFactory {
private final GZIPInputStreamFactory gzipInputStreamFactory;
public GZIPBufferedReaderFactory(GZIPInputStreamFactory gzipInputStreamFactory) {
this.gzipInputStreamFactory = gzipInputStreamFactory;
}
@Override
public BufferedReader create(Resource resource, String encoding) throws IOException {
return new BufferedReader(new InputStreamReader(gzipInputStreamFactory.create(resource.getInputStream()), encoding));
}
}
AWSConfiguration.java
@Configuration
public class AWSConfiguration {
@Bean
public AmazonS3Client s3Client(AWSCredentialsProvider credentials, Region region) {
ClientConfiguration clientConfig = new ClientConfiguration();
AmazonS3Client client = new AmazonS3Client(credentials, clientConfig);
client.setRegion(region);
return client;
}
}
How you configure the AWSCredentialsProvider
and Region
beans can vary and I will not detail that here since there is documentation elsewhere.
BatchConfiguration.java
@Configuration
@EnableBatchProcessing
public class SignalsIndexBatchConfiguration {
@Autowired
public AmazonS3Client s3Client;
@Bean
public GZIPInputStreamFactory gzipInputStreamFactory() {
return new GZIPInputStreamFactory();
}
@Bean
public GZIPBufferedReaderFactory gzipBufferedReaderFactory(GZIPInputStreamFactory gzipInputStreamFactory) {
return new GZIPBufferedReaderFactory(gzipInputStreamFactory);
}
@Bean
public SimpleStorageResourceLoader simpleStorageResourceLoader() {
return new SimpleStorageResourceLoader(s3Client);
}
@Bean
@StepScope
protected FlatFileItemReader<String> itemReader(
SimpleStorageResourceLoader simpleStorageResourceLoader,
GZIPBufferedReaderFactory gzipBufferedReaderFactory) {
FlatFileItemReader<String> flatFileItemReader = new FlatFileItemReader<>();
flatFileItemReader.setBufferedReaderFactory(gzipBufferedReaderFactory);
flatFileItemReader.setResource(simpleStorageResourceLoader.getResource("s3://YOUR_FLAT_FILE.csv"));
flatFileItemReader.setLineMapper(new PassThroughLineMapper());
return flatFileItemReader;
}
@Bean
public Job job(Step step) {
return jobBuilderFactory.get("job").start(step).build();
}
@Bean
protected Step step(GZIPInputStreamFactory gzipInputStreamFactory) {
return stepBuilderFactory.get("step")
.<String, String> chunk(200)
.reader(itemReader(simpleStorageResourceLoader(), gzipBufferedReaderFactory(gzipInputStreamFactory)))
.processor(itemProcessor())
.faultTolerant()
.build();
}
/*
* These components are some of what we
* get for free with the @EnableBatchProcessing annotation
*/
@Autowired
public JobBuilderFactory jobBuilderFactory;
@Autowired
public StepBuilderFactory stepBuilderFactory;
@Autowired
public JobRepository jobRepository;
/*
* END Freebies
*/
@Bean
public JobLauncher jobLauncher() throws Exception {
SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
jobLauncher.setJobRepository(jobRepository);
jobLauncher.setTaskExecutor(new SimpleAsyncTaskExecutor());
jobLauncher.afterPropertiesSet();
return jobLauncher;
}
}