4
votes

I have a requirement where I read a file with different kinds of input as below:

*JAMBEG,APP=000007,123456
AC,654321,“ABCD12121212121212”,23423423423424234,ABCDD,23423423423424234,2424,XYZ,ABC,TREX,000000002
AC,654321,“ABCD12121212121213”,23423423423424234,ABCDD,23423423423424234,2424,XYZ,ABC, TREX,000000002
...
AC,654321,“ABCD12121212121214”,23423423423424234,ABCDD,23423423423424234,2424,XYZ,ABC, TREX,000000002
*JAMEND,APP=000007,123456
EOF

I need only the Header line and the records following that, ignoring the line that starts with TREX, *JAMEND, EOF.

Here is how my line mapper is:

public LineMapper<Customer> lineMapper(){

    DelimitedLineTokenizer lineTokenizerHeader = new DelimitedLineTokenizer();
    lineTokenizerHeader.setNames(new String[]{"association","companyNumber","fileDate"});
    lineTokenizerHeader.setIncludedFields(new int[]{0,1,2});
    lineTokenizerHeader.setStrict(false);

    DelimitedLineTokenizer lineTokenizerBody = new DelimitedLineTokenizer();
    lineTokenizerBody.setNames(new String[]{"type","acNumber","orderNumber"});
    lineTokenizerBody.setIncludedFields(new int[]{0,1,2});
    lineTokenizerBody.setStrict(false);


    HashMap<String, DelimitedLineTokenizer> tokenizers = new HashMap<String, DelimitedLineTokenizer>();
    tokenizers.put("*BEG*", lineTokenizerHeader);
    tokenizers.put("AC*", lineTokenizerBody);

    BeanWrapperFieldSetMapper<Customer> beanWrapperFieldSetMapper = new BeanWrapperFieldSetMapper<Customer>();
    beanWrapperFieldSetMapper.setTargetType(Customer.class);
    beanWrapperFieldSetMapper.setStrict(false);

    HashMap<String, BeanWrapperFieldSetMapper<Customer>> fieldSetMappers = new HashMap<String, BeanWrapperFieldSetMapper<Customer>>();
    fieldSetMappers.put("*BEG*", beanWrapperFieldSetMapper);
    fieldSetMappers.put("AC*", beanWrapperFieldSetMapper);

    PatternMatchingCompositeLineMapper patternMatchingCompositeLineMapper = new PatternMatchingCompositeLineMapper();
    patternMatchingCompositeLineMapper.setTokenizers(tokenizers);
    patternMatchingCompositeLineMapper.setFieldSetMappers(fieldSetMappers);

    return patternMatchingCompositeLineMapper;
}

Its my obvious mistake that I don't have any mapping for TREX, *JAMEND, EOF patterns. Hence it throws the below exception:

2014-06-16 16:49:34,746 [main] DEBUG org.springframework.batch.core.step.item.FaultTolerantChunkProvider - Parsing error at line: 5 in resource=[class path resource [0000123456.csv]], input=[EOF] : org.springframework.batch.item.file.FlatFileParseException 2014-06-16 16:49:34,746 [main] DEBUG org.springframework.batch.core.step.item.FaultTolerantChunkProvider - Skipping failed input org.springframework.batch.item.file.FlatFileParseException: Parsing error at line: 5 in resource=[class path resource [0000123456.csv]], input=[EOF] at org.springframework.batch.item.file.FlatFileItemReader.doRead(FlatFileItemReader.java:183) at org.springframework.batch.item.support.AbstractItemCountingItemStreamItemReader.read(AbstractItemCountingItemStreamItemReader.java:83) at org.springframework.batch.core.step.item.SimpleChunkProvider.doRead(SimpleChunkProvider.java:91) at org.springframework.batch.core.step.item.FaultTolerantChunkProvider.read(FaultTolerantChunkProvider.java:87) at org.springframework.batch.core.step.item.SimpleChunkProvider$1.doInIteration(SimpleChunkProvider.java:114) at org.springframework.batch.repeat.support.RepeatTemplate.getNextResult(RepeatTemplate.java:368) at org.springframework.batch.repeat.support.RepeatTemplate.executeInternal(RepeatTemplate.java:215) at org.springframework.batch.repeat.support.RepeatTemplate.iterate(RepeatTemplate.java:144) at org.springframework.batch.core.step.item.SimpleChunkProvider.provide(SimpleChunkProvider.java:108) at org.springframework.batch.core.step.item.ChunkOrientedTasklet.execute(ChunkOrientedTasklet.java:69) at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:402) at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:326) at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:130) at org.springframework.batch.core.step.tasklet.TaskletStep$2.doInChunkContext(TaskletStep.java:267) at org.springframework.batch.core.scope.context.StepContextRepeatCallback.doInIteration(StepContextRepeatCallback.java:77) at org.springframework.batch.repeat.support.RepeatTemplate.getNextResult(RepeatTemplate.java:368) at org.springframework.batch.repeat.support.RepeatTemplate.executeInternal(RepeatTemplate.java:215) at org.springframework.batch.repeat.support.RepeatTemplate.iterate(RepeatTemplate.java:144) at org.springframework.batch.core.step.tasklet.TaskletStep.doExecute(TaskletStep.java:253) at org.springframework.batch.core.step.AbstractStep.execute(AbstractStep.java:198) at org.springframework.batch.core.job.SimpleStepHandler.handleStep(SimpleStepHandler.java:148) at org.springframework.batch.core.job.AbstractJob.handleStep(AbstractJob.java:386) at org.springframework.batch.core.job.SimpleJob.doExecute(SimpleJob.java:135) at org.springframework.batch.core.job.AbstractJob.execute(AbstractJob.java:304) at org.springframework.batch.core.launch.support.SimpleJobLauncher$1.run(SimpleJobLauncher.java:135) at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:48) at org.springframework.batch.core.launch.support.SimpleJobLauncher.run(SimpleJobLauncher.java:128) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) at java.lang.reflect.Method.invoke(Method.java:597) at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:318) at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:183) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:150) at org.springframework.batch.core.configuration.annotation.SimpleBatchConfiguration$PassthruAdvice.invoke(SimpleBatchConfiguration.java:117) at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:172) at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:202) at com.sun.proxy.$Proxy17.run(Unknown Source) at com.chofac.pm.batch.CustomerFileToDBJobTest.testLaunchJob(CustomerFileToDBJobTest.java:48) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) at java.lang.reflect.Method.invoke(Method.java:597) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:45) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:15) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:42) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:20) at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:28) at org.springframework.test.context.junit4.statements.RunBeforeTestMethodCallbacks.evaluate(RunBeforeTestMethodCallbacks.java:74) at org.springframework.test.context.junit4.statements.RunAfterTestMethodCallbacks.evaluate(RunAfterTestMethodCallbacks.java:83) at org.springframework.test.context.junit4.statements.SpringRepeat.evaluate(SpringRepeat.java:72) at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:231) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:47) at org.junit.runners.ParentRunner$3.run(ParentRunner.java:231) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:60) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:229) at org.junit.runners.ParentRunner.access$000(ParentRunner.java:50) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:222) at org.springframework.test.context.junit4.statements.RunBeforeTestClassCallbacks.evaluate(RunBeforeTestClassCallbacks.java:61) at org.springframework.test.context.junit4.statements.RunAfterTestClassCallbacks.evaluate(RunAfterTestClassCallbacks.java:71) at org.junit.runners.ParentRunner.run(ParentRunner.java:300) at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.run(SpringJUnit4ClassRunner.java:174) at org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:50) at org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38) at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:467) at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:683) at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:390) at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:197) Caused by: java.lang.IllegalStateException: Could not find a matching pattern for key=[EOF] at org.springframework.batch.support.PatternMatcher.match(PatternMatcher.java:226) at org.springframework.batch.item.file.mapping.PatternMatchingCompositeLineMapper.mapLine(PatternMatchingCompositeLineMapper.java:62) at org.springframework.batch.item.file.FlatFileItemReader.doRead(FlatFileItemReader.java:180) ... 67 more

I looked at many examples, this one matched close, and changed my step as below, but still the same issue.

@Bean
public Step step(){
    return stepBuilders.get("step")
    .<Customer,Customer>chunk(1)
    .reader(CustomerAUFileReader())
    .faultTolerant()
    .skipLimit(3)
    .skip(Exception.class)
    .processor(CustomerRecordProcessor())
    .writer(CustomerDBWriter())
    .listener(logProcessListener())
    .build();
}

Looked at the Spring.io docs here for skipping records (5.1.5 Configuring Skip Logic), does not work either.

Please let me know the ideal way to get around this issue. Should there not be an easy way to specify skipping records that do not match specific cases? Please advise. Thanks.

---

I have a pattern mapper for '*' which I am mapping with a dummy class. I am returning null at the process stage but its throwing nullpointerexception.

Stack Trace:

2014-06-17 10:03:01,690 [main] DEBUG org.springframework.batch.core.step.item.FaultTolerantChunkProcessor - Skipping after failed process
org.springframework.batch.core.listener.StepListenerFailedException: Error in afterProcess.
    at org.springframework.batch.core.listener.MulticasterBatchListener.afterProcess(MulticasterBatchListener.java:136)
    at org.springframework.batch.core.step.item.SimpleChunkProcessor.doProcess(SimpleChunkProcessor.java:127)
    at org.springframework.batch.core.step.item.FaultTolerantChunkProcessor$1.doWithRetry(FaultTolerantChunkProcessor.java:225)
    at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:263)
    at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:193)
    at org.springframework.batch.core.step.item.BatchRetryTemplate.execute(BatchRetryTemplate.java:217)
    at org.springframework.batch.core.step.item.FaultTolerantChunkProcessor.transform(FaultTolerantChunkProcessor.java:290)
    at org.springframework.batch.core.step.item.SimpleChunkProcessor.process(SimpleChunkProcessor.java:192)
    at org.springframework.batch.core.step.item.ChunkOrientedTasklet.execute(ChunkOrientedTasklet.java:75)
    at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:402)
    at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:326)
    at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:130)
    at org.springframework.batch.core.step.tasklet.TaskletStep$2.doInChunkContext(TaskletStep.java:267)
    at org.springframework.batch.core.scope.context.StepContextRepeatCallback.doInIteration(StepContextRepeatCallback.java:77)
    at org.springframework.batch.repeat.support.RepeatTemplate.getNextResult(RepeatTemplate.java:368)
    at org.springframework.batch.repeat.support.RepeatTemplate.executeInternal(RepeatTemplate.java:215)
    at org.springframework.batch.repeat.support.RepeatTemplate.iterate(RepeatTemplate.java:144)
    at org.springframework.batch.core.step.tasklet.TaskletStep.doExecute(TaskletStep.java:253)
    at org.springframework.batch.core.step.AbstractStep.execute(AbstractStep.java:198)
    at org.springframework.batch.core.job.SimpleStepHandler.handleStep(SimpleStepHandler.java:148)
    at org.springframework.batch.core.job.AbstractJob.handleStep(AbstractJob.java:386)
    at org.springframework.batch.core.job.SimpleJob.doExecute(SimpleJob.java:135)
    at org.springframework.batch.core.job.AbstractJob.execute(AbstractJob.java:304)
    at org.springframework.batch.core.launch.support.SimpleJobLauncher$1.run(SimpleJobLauncher.java:135)
    at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:48)
    at org.springframework.batch.core.launch.support.SimpleJobLauncher.run(SimpleJobLauncher.java:128)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
    at java.lang.reflect.Method.invoke(Method.java:597)
    at org.springframework.aop.support.AopUtils.invokeJoinpointUsingReflection(AopUtils.java:318)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.invokeJoinpoint(ReflectiveMethodInvocation.java:183)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:150)
    at org.springframework.batch.core.configuration.annotation.SimpleBatchConfiguration$PassthruAdvice.invoke(SimpleBatchConfiguration.java:117)
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:172)
    at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:202)
    at com.sun.proxy.$Proxy17.run(Unknown Source)
    at com.chofac.pl.batch.CustomerFileToDBJobTest.testLaunchJob(CustomerFileToDBJobTest.java:48)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
    at java.lang.reflect.Method.invoke(Method.java:597)
    at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:45)
    at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:15)
    at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:42)
    at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:20)
    at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:28)
    at org.springframework.test.context.junit4.statements.RunBeforeTestMethodCallbacks.evaluate(RunBeforeTestMethodCallbacks.java:74)
    at org.springframework.test.context.junit4.statements.RunAfterTestMethodCallbacks.evaluate(RunAfterTestMethodCallbacks.java:83)
    at org.springframework.test.context.junit4.statements.SpringRepeat.evaluate(SpringRepeat.java:72)
    at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:231)
    at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:47)
    at org.junit.runners.ParentRunner$3.run(ParentRunner.java:231)
    at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:60)
    at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:229)
    at org.junit.runners.ParentRunner.access$000(ParentRunner.java:50)
    at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:222)
    at org.springframework.test.context.junit4.statements.RunBeforeTestClassCallbacks.evaluate(RunBeforeTestClassCallbacks.java:61)
    at org.springframework.test.context.junit4.statements.RunAfterTestClassCallbacks.evaluate(RunAfterTestClassCallbacks.java:71)
    at org.junit.runners.ParentRunner.run(ParentRunner.java:300)
    at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.run(SpringJUnit4ClassRunner.java:174)
    at org.eclipse.jdt.internal.junit4.runner.JUnit4TestReference.run(JUnit4TestReference.java:50)
    at org.eclipse.jdt.internal.junit.runner.TestExecution.run(TestExecution.java:38)
    at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:467)
    at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.runTests(RemoteTestRunner.java:683)
    at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.run(RemoteTestRunner.java:390)
    at org.eclipse.jdt.internal.junit.runner.RemoteTestRunner.main(RemoteTestRunner.java:197)
Caused by: java.lang.NullPointerException
    at com.chofac.pl.batch.CustomerItemProcessListener.afterProcess(CustomerItemProcessListener.java:13)
    at org.springframework.batch.core.listener.CompositeItemProcessListener.afterProcess(CompositeItemProcessListener.java:60)
    at org.springframework.batch.core.listener.MulticasterBatchListener.afterProcess(MulticasterBatchListener.java:133)
5

5 Answers

5
votes

One approach:

Have your ItemReader simply read a line and return as is. Therefore the items given by the reader will be a simple String.

Write a simple ItemProcessor, which mostly do the work of your LineMapper, base on a pattern for example: if the item matches with a pattern, then translate the input string to your Customer return. If pattern not matching, simply return null and the item will be skipped.

psuedo code for the item processor:

class CustomPatternMatchingItemProcessor<String, Customer> 
        implements ItemProcessor<String, Customer> {
    private String pattern;

    public Customer process(String s) {
        if (s matches pattern) {
            construct Customer object base on s
            return customer
        } else {
            return null;
        }
    }
}

Or even cleaner: have one processor do the work of mapping from String to Customer, and another processor to do the validation of string base on regex.

just chain your processors using a CompositeItemProcessor. This give a even better separation of concern for each of your processor.

2
votes

I have a similiar issue, where the file I am sent will always have some lines that don't fit the correct pattern. As I just want to ignore these lines, I log them, and skip them.

You can implement org.springframework.batch.repeat.exception.ExceptionHandler like such:

class LocalExceptionHandler implements ExceptionHandler {

    @Override
    public void handleException(RepeatContext rc, Throwable throwable) throws Throwable {
        if (throwable instanceof FlatFileParseException) {
            FlatFileParseException fe = (FlatFileParseException)throwable;
            log.error("!!!! FlatFileParseException, line # is: " + fe.getLineNumber());
            log.error("!!!! FlatFileParseException, input is: " + fe.getInput());
        }
        log.error("!!!! Message : " + throwable.getMessage());
        log.error("!!!! Cause : " + throwable.getCause());      
    }
}

and then add that to your step builder:

faultTolerantStepBuilder.exceptionHandler(new LocalExceptionHandler());
faultTolerantStepBuilder.skipLimit(100);
1
votes

Your intention is not to skip object due to errors, but skip records with a logic; I think your best option is put a mapper binded to '*' and return a custom object (like a SkippableRecordBean) instead of a Customer and filter out unwanted beans in ItemProcessor.

0
votes

We started returning Dummy object from the LineMapper instead of null as null causes the reader to skip reading other lines. In the ItemWriter, the write method still gets this dummy object and you need to validate before processing it. We ignore the dummy records in the ItemWriter.

0
votes
public class FileVerificationSkipper implements SkipPolicy {

    @Autowired
    private Environment environment;

    @Override
    public boolean shouldSkip(Throwable exception, int skipCount) throws SkipLimitExceededException {
        int skipErrorsRecords = Integer.valueOf(environment.getProperty("max.error.record.count"));
        if (exception instanceof FileNotFoundException) {
            return false;
        } else if (exception instanceof FlatFileParseException && (skipErrorsRecords < 0 || skipCount <= (skipErrorsRecords-1))) {
            FlatFileParseException ffpe = (FlatFileParseException) exception;
            StringBuilder errorMessage = new StringBuilder();
            errorMessage.append((skipCount+1)+", An error occured while processing the " + ffpe.getLineNumber() + " record of the file. the faulty record is:\n");
            errorMessage.append(ffpe.getInput() + "\n");
            return true;
        } else {
            return false;
        }
    }
}

and StepBuilder is:

@Bean
public Step step1() throws Exception{
    return stepBuilderFactory.get("step1")
            .<User, User> chunk(50)
            .reader(reader())
            .faultTolerant()
            .skipPolicy(fileVerificationSkipper())
            .processor(processor())
            .writer(writer())
            .build();
}