I am working a code which polls a SFTP folder and process them one by one then move them to a success folder once it is done. we run this camel instance more that 1. To process the file by one instance, we have a DB check like once a camel instance takes the file it creates a marker record and any other instances come to pick the file it will check this marker record and skip the process. so we have created a custom exception like Markerdocument available in a processor which will handle that. This marker exception will be handled at globally and we mark them handled "true" so file will not be moved /ERROR folder. Now the problem is, since we are handling this the route process thinks that it is successfully processed and moving the folder to /success which we don't want, this will be handled by the actual record processing camel instance. How do we achieve that?
onException(MarkerDocumentExistsException.class)
.process("routeExceptionProcessor")
.log("Marker document available so continue next message processing... ")
.handled(true)
.stop();
onException(Exception.class)
.process("routeExceptionProcessor")
.log("Nothing to do here, move the file to error folder... ")
.redeliveryDelay(2000)
.stop();
from(getSftpOptions())
.routeId("sftp-Route")
.throttle(Integer.parseInt(appProp.throttleCount))
.timePeriodMillis(1000*60)
.onCompletion()
.onCompleteOnly()
.process("routeCompleteProcessor")
.log("Route processing is Completed : ${header.CamelFileNameOnly}")
.end()
.log("Starting the file process: ${header.CamelFileNameOnly}")
.process("routeCBCheckProcessor")
.unmarshal().gzipDeflater()
.split(body().tokenizeXML("example", "*"),new SimpleStringAggregator()).streaming()
.parallelProcessing(true)
.process("lineProcessor")
.end()
.log("aggregate is completed :${header.CamelFileNameOnly}")