0
votes

I am working a code which polls a SFTP folder and process them one by one then move them to a success folder once it is done. we run this camel instance more that 1. To process the file by one instance, we have a DB check like once a camel instance takes the file it creates a marker record and any other instances come to pick the file it will check this marker record and skip the process. so we have created a custom exception like Markerdocument available in a processor which will handle that. This marker exception will be handled at globally and we mark them handled "true" so file will not be moved /ERROR folder. Now the problem is, since we are handling this the route process thinks that it is successfully processed and moving the folder to /success which we don't want, this will be handled by the actual record processing camel instance. How do we achieve that?

onException(MarkerDocumentExistsException.class)
        .process("routeExceptionProcessor")
        .log("Marker document available so continue next message processing... ")
        .handled(true)
        .stop();    
        onException(Exception.class)
        .process("routeExceptionProcessor")
        .log("Nothing to do here, move the file to error folder... ")
        .redeliveryDelay(2000)
        .stop();
    from(getSftpOptions())
        .routeId("sftp-Route")
        .throttle(Integer.parseInt(appProp.throttleCount))
        .timePeriodMillis(1000*60)
        .onCompletion()
        .onCompleteOnly()
            .process("routeCompleteProcessor")
            .log("Route processing is Completed : ${header.CamelFileNameOnly}")
            .end()
        .log("Starting the file process: ${header.CamelFileNameOnly}")
        .process("routeCBCheckProcessor")
        .unmarshal().gzipDeflater()
        .split(body().tokenizeXML("example", "*"),new SimpleStringAggregator()).streaming()
            .parallelProcessing(true)
            .process("lineProcessor")
        .end()
        .log("aggregate is completed :${header.CamelFileNameOnly}")
1

1 Answers

1
votes

Well, you began to build an idempotent file consumer with an idempotent repository.

Have you tried to use the built-in idempotent file consumer of Camel (with optional repository included)?

I think this should also work for SFTP since the component has the idempotent option and the SFTP component is an extension of the file component.