I am trying to use spring batch to read file from a .dat file and persist the data into database. My requirement says to either insert all of the data or insert none of the data into table i.e, atomicity. However, using spring batch i'm not able to achieve the same it is reading data in chunks and is inserting data as long as the records are fine. if at some point the record is inappropriate and some db exception is thrown then i want complete rollback which is not happening. Let's say we get error at 2051th record then my code saves 2050 records but i want complete rollback and if all data is good then all N records should be persisted. Thanks in advance for any help or relevant approach that may solve my issue...
NOTE: I have already used Spring Transactional annotation on caller method but it's not working and i'm reading data in a chunk size of 10 items.
MyConfiguration.java
@Configuration
public class MyConfiguration
{
@Autowired
JobBuilderFactory jobBuilderFactory;
@Autowired
StepBuilderFactory stepBuilderFactory;
@Autowired
@Qualifier("MyCompletionListener")
JobCompletionNotificationListener jobCompletionNotificationListener;
@StepScope
@Bean(name="MyReader")
public FlatFileItemReader<InputMapperDTO> reader(@Value("#{jobParameters['fileName']}") String fileName) throws IOException
{
FlatFileItemReader<InputMapperDTO> newBean = new FlatFileItemReader<>();
newBean.setName("MyReader");
newBean.setResource(new InputStreamResource(FileUtils.openInputStream(new File(fileName))));
newBean.setLineMapper(lineMapper());
newBean.setLinesToSkip(1);
return newBean;
}
@Bean(name="MyLineMapper")
public DefaultLineMapper<InputMapperDTO> lineMapper()
{
DefaultLineMapper<InputMapperDTO> lineMapper = new DefaultLineMapper<>();
lineMapper.setLineTokenizer(lineTokenizer());
Reader reader = new Reader();
lineMapper.setFieldSetMapper(reader);
return lineMapper;
}
@Bean(name="MyTokenizer")
public DelimitedLineTokenizer lineTokenizer()
{
DelimitedLineTokenizer tokenizer = new DelimitedLineTokenizer();
tokenizer.setDelimiter("|");
tokenizer.setNames("InvestmentAccountUniqueIdentifier", "BaseCurrencyUniqueIdentifier",
"OperatingCurrencyUniqueIdentifier", "PricingHierarchyUniqueIdentifier", "InvestmentAccountNumber",
"DummyAccountIndicator", "InvestmentAdvisorCompanyNumberLegacy","HighNetWorthAccountTypeCode");
tokenizer.setIncludedFields(0, 5, 7, 13, 29, 40, 49,75);
return tokenizer;
}
@Bean(name="MyBatchProcessor")
public ItemProcessor<InputMapperDTO, FinalDTO> processor()
{
return new Processor();
}
@Bean(name="MyWriter")
public ItemWriter<FinalDTO> writer()
{
return new Writer();
}
@Bean(name="MyStep")
public Step step1() throws IOException
{
return stepBuilderFactory.get("MyStep")
.<InputMapperDTO, FinalDTO>chunk(10)
.reader(this.reader(null))
.processor(this.processor())
.writer(this.writer())
.build();
}
@Bean(name=MyJob")
public Job importUserJob(@Autowired @Qualifier("MyStep") Step step1)
{
return jobBuilderFactory
.get("MyJob"+new Date())
.incrementer(new RunIdIncrementer())
.listener(jobCompletionNotificationListener)
.flow(step1)
.end()
.build();
}
}
Writer.java
public class Writer implements ItemWriter<FinalDTO>
{
@Autowired
SomeRepository someRepository;
@Override
public void write(List<? extends FinalDTO> listOfObjects) throws Exception
{
someRepository.saveAll(listOfObjects);
}
}
JobCompletionNotificationListener.java
public class JobCompletionNotificationListener extends JobExecutionListenerSupport
{
@Override
public void afterJob(JobExecution jobExecution)
{
if(jobExecution.getStatus() == BatchStatus.COMPLETED)
{
System.err.println("****************************************");
System.err.println("***** Batch Job Completed ******");
System.err.println("****************************************");
}
else
{
System.err.println("****************************************");
System.err.println("***** Batch Job Failed ******");
System.err.println("****************************************");
}
}
}
MyCallerMethod
@Transactional
public String processFile(String datFile) throws JobExecutionAlreadyRunningException, JobRestartException,
JobInstanceAlreadyCompleteException, JobParametersInvalidException
{
long st = System.currentTimeMillis();
JobParametersBuilder builder = new JobParametersBuilder();
builder.addString("fileName",datFile);
builder.addDate("date", new Date());
jobLauncher.run(job, builder.toJobParameters());
System.err.println("****************************************");
System.err.println("***** Total time consumed = "+(System.currentTimeMillis()-st)+" ******");
System.err.println("****************************************");
return response;
}