I've recently started introducing myself to the BigData world experimenting with the Apache Storm. I have faced the following problem, thought a lot how to solve it, but all my approaches seem naïve.
Technologies
Apache Storm 0.9.3, Java 1.8.0_20
Context
There is a big xml file (~400MB) that is required to be read line-by-line (xml-file-spout). Each read file line is then emitted and processed by a chain of bolts.
It has to be a guaranteed message processing (emitting with anchoring...)
Problem
As far as the file is pretty big (contains at about 20 billions of lines) I read it with a scanner, based on buffered stream not to load the whole file into the memory. So far so good. The problem emerges when there is an error somewhere in the middle of processing: the xml-file-spout itself died, or there is some internal issue...
- The Nimbus will restart the spout, but the whole processing starts from the very beginning;
- This approach does not scale at all.
Solution Thoughts
An initial idea for solving the 1'st problem was to save the current state somewhere: distributed cache, JMS queue, local disk file. When a spout opens, it should find such storage, read the state and proceed from the specified file line. Here I also thought about storing the state in the Storm's Zookeeper, but I don't know whether it is possible to address Zookeeper form the spout (is there such the ability)? Could you please suggest the best practice for this?
For the problem 2 I thought about breaking the initial file to a set of subfiles and process them in parallel. It can be done by introducing a new 'breaking' spout, where each file would be processed by a dedicated bolt. In this case the big problem raises with a guaranteed processing, cause in case of error the subfile, that contains the failed line, has to be fully reprocessed (ack/fail methods of the spout)... Could you please suggest the best practice for solution to this problem?
Update
Ok, what I did so far.
Prerequisites
The following topology works because all its parts (spouts and bolts) are idempotent.
Introduced a separate spout that reads file lines (one by one) and sends them to an intermediate ActiveMQ queue ('file-line-queue') to be able to replay failed file lines easily (see the next step);
Created a separate spout for the 'file-line-queue' queue, that receives each file line and emits it to the subsequent bolts. As far as I use the guaranteed message processing, in case of any bolt's failure a message is reprocessed, and if the bolt chain is successful a corresponding message is acknowledged (CLIENT_ACKNOWLEDGE mode).
In case of a first (file reading) spout's failure, a RuntimeException is thrown, which kills the spout. Later on a dedicated supervisor restarts the spout making an inout file be re-read. This will cause duplicated messages, but as far as everything is idempotent, it is not a problem. Also, here it is worth thinking about a state repository to produce less duplicates...
New Issue
In order to make the intermediate JMS more reliable I've added an on-exception-listener that restores a connection and a session for both the consumer and producer. The problem is with the consumer: if a session is restored and I have a JMS message unacked in the middle of the bolt processing, after a successful processing I need to ack it, but as far as a session is new, I receive the 'can't find correlation id' issue.
Could somebody please suggest how to deal with it?