6
votes

I am running a Kafka Stream application which consumes data from 2 topics and output the joined/merged result into 3 topic. The kafka topics have 15 partitions and 3 replication factor. We have 5 kafka brokers and 5 zookeeper's. I am running 15 instances of Kafka Stream application so each application can have 1 partition. Kafka version- 0.11.0.0

I am getting the below exception in my kafka stream application:

org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records. at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:725) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:604) at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1173) at org.apache.kafka.streams.processor.internals.StreamTask.commitOffsets(StreamTask.java:307) at org.apache.kafka.streams.processor.internals.StreamTask.access$000(StreamTask.java:49) at org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:268) at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:187) at org.apache.kafka.streams.processor.internals.StreamTask.commitImpl(StreamTask.java:259) at org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:362) at org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:346) at org.apache.kafka.streams.processor.internals.StreamThread$3.apply(StreamThread.java:1118) at org.apache.kafka.streams.processor.internals.StreamThread.performOnStreamTasks(StreamThread.java:1448) at org.apache.kafka.streams.processor.internals.StreamThread.suspendTasksAndState(StreamThread.java:1110) at org.apache.kafka.streams.processor.internals.StreamThread.access$1800(StreamThread.java:73) at org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsRevoked(StreamThread.java:218) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinPrepare(ConsumerCoordinator.java:422) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:353) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:310) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:297) at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1078) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043) at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:582) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:527) 2017-08-09 14:50:49 - [ERROR] [click-live-StreamThread-1] [org.apache.kafka.streams.processor.internals.StreamThread.performOnStreamTasks:1453] :

Can someone please help what could be the cause and solution?

Also, when 1 of my kafka broker is down, my kafka stream application is not connecting to other broker? I have set brokers.list=broker1:9092,broker2:9092,broker3:9092,broker4:9092,broker5:9092

1
What is your max.poll.interval.ms setting? I had similar issues when max.poll.interval.ms was too short. The best is to take default value when using Kafka >0.11 or MAX_INT in versions <0.11.Seweryn Habdank-Wojewódzki
Did you try to follow the suggestions in the message? "You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records"Dennis Jaheruddin

1 Answers

0
votes

Based on the information, this is the most likely solution route:

Try to follow the suggestions in the message:

"You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records"