2
votes

I have been looking around Apache kafka for implementing a HighLevel consumer (I do not want to play with the message, I just simply need to put the data into MongoDB) v0.8.1.1

I have looked at below links which shows very great details about how to implement the consumer.

Apache Kafka consumer wiki Another kafka consumer

But I am still clueless as to how the Consumer restarts after all the threads are shutdown. E.g. Lets say I have 4 threads of consumer running and they consumed all the messages from kafka broker so once there are no messages all the consumers will do nothing and after specific timeout it will be shutdown so I am not sure that how the consumer is again restarted when there are new messages in kafka broker.

Can someone share some code or atleast some pointers on this. Also is there a way where we can have our business logic in some callback method which will get called when there are messages instead of having while loop.

1

1 Answers

3
votes

I think you may be misunderstanding the use of timeouts during shutdown. Theoretically, you're consuming an infinite stream of events regardless of the time between those events, so your consuner should never be shut down unless you're updating the code or a machine crashes. In the event of actually needing to shut down the consumer, what the 10000 millisecond timeout does is gives the Kafka consumer enough time to write its last read offset to ZooKeeper so that when the consumer is restarted it will resume from the last offset it processed. This consumer shut down normally occurs when your program is shut down (perhaps an InterruptedException is caught), not just the consumer. Thus, the consuner is restarted when your program is restarted.

EDIT

I should add that the reason Kafka's ConsumerIterator follows this model of never ending consumption. The iterator's next method will always block until it can read the next message. Therefore, the only way the timeout in the example is ever reached is if the consumer threads are shut down by some exception.

EDIT 2

I haven't seen any Kafka consumer API that supports callbacksI think your only option right now is writing your own callback implementation, e.g.:

public interface Callback {
  void call(MessageAndMetadata message);
}

Executor executor = Executors.newCachedThreadPool();
final Callback<byte[], byte[]> callback = new MyCallback();
while (it.hasNext()) {
  final MessageAndMetadata message = it.next();
  executor.submit(new Runnable() {
    public void run() {
      callback.call(message);
    }
  });
}

You might be interested that they're currently rewriting the consumer API for Kafka 0.9, but I don't think I've seen callbacks in the rewrite (though I could be wrong).