I am running a Kafka Stream application which consumes data from 2 topics and output the joined/merged result into 3 topic. The kafka topics have 15 partitions and 3 replication factor. We have 5 kafka brokers and 5 zookeeper's. I am running 15 instances of Kafka Stream application so each application can have 1 partition. Kafka version- 0.11.0.0
I am getting the below exception in my kafka stream application:
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records. at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:725) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:604) at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1173) at org.apache.kafka.streams.processor.internals.StreamTask.commitOffsets(StreamTask.java:307) at org.apache.kafka.streams.processor.internals.StreamTask.access$000(StreamTask.java:49) at org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:268) at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:187) at org.apache.kafka.streams.processor.internals.StreamTask.commitImpl(StreamTask.java:259) at org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:362) at org.apache.kafka.streams.processor.internals.StreamTask.suspend(StreamTask.java:346) at org.apache.kafka.streams.processor.internals.StreamThread$3.apply(StreamThread.java:1118) at org.apache.kafka.streams.processor.internals.StreamThread.performOnStreamTasks(StreamThread.java:1448) at org.apache.kafka.streams.processor.internals.StreamThread.suspendTasksAndState(StreamThread.java:1110) at org.apache.kafka.streams.processor.internals.StreamThread.access$1800(StreamThread.java:73) at org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsRevoked(StreamThread.java:218) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinPrepare(ConsumerCoordinator.java:422) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:353) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:310) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:297) at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1078) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043) at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:582) at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553) at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:527) 2017-08-09 14:50:49 - [ERROR] [click-live-StreamThread-1] [org.apache.kafka.streams.processor.internals.StreamThread.performOnStreamTasks:1453] :
Can someone please help what could be the cause and solution?
Also, when 1 of my kafka broker is down, my kafka stream application is not connecting to other broker?
I have set brokers.list=broker1:9092,broker2:9092,broker3:9092,broker4:9092,broker5:9092
max.poll.interval.ms
setting? I had similar issues whenmax.poll.interval.ms
was too short. The best is to take default value when using Kafka >0.11 or MAX_INT in versions <0.11. – Seweryn Habdank-Wojewódzki