0
votes

I have a directory, to which files are being pushed by a camel route. From the same directory, I require multiple threads to consume the files and process them.

<route id="processMessagesFromDirectory">
    <from uri="file:/directory?readLock=changed" />
        <threads poolSize="8"></threads>
        <doTry>
            <log message="process Initiated for ${body}" />
            <doCatch>
                <exception>java.lang.Exception</exception>
            </doCatch>
        </doTry>
    <log message="Processed ${body}" />
</route>

I know using 1 thread, readLock=changed will introduce a delay of atleast 1000ms. Using multiple threads, same thing was happening, a thread is waiting 1000ms picking up a file and processing, and then another thread is picking up another file and processing and so on. What exactly is happening by using readLock=changed option? How can I introduce parallelism?

I was able to process them parallely by not using readLock=markerFile option, but I don't want to process partial messages nor want to process messages more than once. How can I do that?

1
There is a JIRA ticket about making changed read-lock faster. Currently its sequential and therefore in some use-cases slow. You can write your own read lock strategy that is faster to get it working with your current Camel version. - Claus Ibsen
Thanks @ClausIbsen, what will be the problem using readLock=markerFile? will there be any problem like multiple threads picking pieces of same file, or a thread picking a file while it is being written? - vineeth kumar
Also @ClausIbsen, at the producer side I can use tempPrefix strategy, so consumers won't read files that are still being written. My only problem to solve now is to make sure my picking is idempotent i.e., I do not consume messages multiple times. And, also I have to achieve parallelism. - vineeth kumar
Yes best strategy is by far for the producer/writer to write using a temp name, and then rename when it's ready to be picked up. - Claus Ibsen

1 Answers

0
votes

If you need to use readLock, you can set its value to markerFile. That will allow the files to be processed asynchronously.

<routes xmlns="http://camel.apache.org/schema/spring">
    <route id="processMessagesFromDirectory">
        <from uri="file:/directory?readLock=markerFile" />
        <threads poolSize="8" />
            <doTry>
                <log message="process Initiated for ${body}" />
                <doCatch>
                    <exception>java.lang.Exception</exception>
                </doCatch>
            </doTry>
        <log message="Processed ${body}" />
    </route>
</routes>