2
votes

I've recently started introducing myself to the BigData world experimenting with the Apache Storm. I have faced the following problem, thought a lot how to solve it, but all my approaches seem naïve.

Technologies

Apache Storm 0.9.3, Java 1.8.0_20

Context

There is a big xml file (~400MB) that is required to be read line-by-line (xml-file-spout). Each read file line is then emitted and processed by a chain of bolts.

It has to be a guaranteed message processing (emitting with anchoring...)

Problem

As far as the file is pretty big (contains at about 20 billions of lines) I read it with a scanner, based on buffered stream not to load the whole file into the memory. So far so good. The problem emerges when there is an error somewhere in the middle of processing: the xml-file-spout itself died, or there is some internal issue...

  1. The Nimbus will restart the spout, but the whole processing starts from the very beginning;
  2. This approach does not scale at all.

Solution Thoughts

An initial idea for solving the 1'st problem was to save the current state somewhere: distributed cache, JMS queue, local disk file. When a spout opens, it should find such storage, read the state and proceed from the specified file line. Here I also thought about storing the state in the Storm's Zookeeper, but I don't know whether it is possible to address Zookeeper form the spout (is there such the ability)? Could you please suggest the best practice for this?

For the problem 2 I thought about breaking the initial file to a set of subfiles and process them in parallel. It can be done by introducing a new 'breaking' spout, where each file would be processed by a dedicated bolt. In this case the big problem raises with a guaranteed processing, cause in case of error the subfile, that contains the failed line, has to be fully reprocessed (ack/fail methods of the spout)... Could you please suggest the best practice for solution to this problem?

Update

Ok, what I did so far.

Prerequisites

The following topology works because all its parts (spouts and bolts) are idempotent.

  1. Introduced a separate spout that reads file lines (one by one) and sends them to an intermediate ActiveMQ queue ('file-line-queue') to be able to replay failed file lines easily (see the next step);

  2. Created a separate spout for the 'file-line-queue' queue, that receives each file line and emits it to the subsequent bolts. As far as I use the guaranteed message processing, in case of any bolt's failure a message is reprocessed, and if the bolt chain is successful a corresponding message is acknowledged (CLIENT_ACKNOWLEDGE mode).

In case of a first (file reading) spout's failure, a RuntimeException is thrown, which kills the spout. Later on a dedicated supervisor restarts the spout making an inout file be re-read. This will cause duplicated messages, but as far as everything is idempotent, it is not a problem. Also, here it is worth thinking about a state repository to produce less duplicates...

New Issue

In order to make the intermediate JMS more reliable I've added an on-exception-listener that restores a connection and a session for both the consumer and producer. The problem is with the consumer: if a session is restored and I have a JMS message unacked in the middle of the bolt processing, after a successful processing I need to ack it, but as far as a session is new, I receive the 'can't find correlation id' issue.

Could somebody please suggest how to deal with it?

1

1 Answers

1
votes

To answer your questions first:

  1. Yes you can store state somewhere like Zookeeper and use a library like Apache Curator to handle that.
  2. Breaking the files up might help but still doesn't solve your problem of having to manage state.

Lets talk a bit about design here. Storm is built for streaming, not for batch. It seems to me that a Hadoop technology which works better for batch would work better here: MapReduce, Hive, Spark, etc.

If you are intent on using storm, then it will help to stream the data somewhere that is easier to work with. You could write the file to Kafka or a queue to help with your problem of managing state, ack/fail, and retry.