10
votes

I have a Kafka Streams application consuming from and producing to a Kafka cluster with 3 brokers and a replication factor of 3. Other than the consumer offset topics (50 partitions), all other topics have only one partition each.

When the brokers attempt a preferred replica election, the Streams app (which is running on a completely different instance than the brokers) fails with the error:

Caused by: org.apache.kafka.streams.errors.StreamsException: task [0_0] exception caught when producing
    at org.apache.kafka.streams.processor.internals.RecordCollectorImpl.checkForException(RecordCollectorImpl.java:119)
    ...
    at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:197)
Caused by: org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is not the leader for that topic-partition.

Is it normal that the Streams app attempts to be the leader for the partition, given that it's running on a server that's not part of the Kafka cluster?

I can reproduce this behaviour on demand by:

  1. Killing one of the brokers (whereupon the other 2 take over as leader for all partitions that had the killed broker as their leader, as expected)
  2. Bringing the killed broker back up
  3. Triggering a preferred replica leader election with bin/kafka-preferred-replica-election.sh --zookeeper localhost

My issue seems to be similar to this reported failure, so I'm wondering if this is a new Kafka Streams bug. My full stack trace is literally exactly the same as the gist linked in the reported failure (here).

Another potentially interesting detail is that during the leader election, I get these messages in the controller.log of the broker:

[2017-04-12 11:07:50,940] WARN [Controller-3-to-broker-3-send-thread], Controller 3's connection to broker BROKER-3-HOSTNAME:9092 (id: 3 rack: null) was unsuccessful (kafka.controller.RequestSendThread)
java.io.IOException: Connection to BROKER-3-HOSTNAME:9092 (id: 3 rack: null) failed
    at kafka.utils.NetworkClientBlockingOps$.awaitReady$1(NetworkClientBlockingOps.scala:84)
    at kafka.utils.NetworkClientBlockingOps$.blockingReady$extension(NetworkClientBlockingOps.scala:94)
    at kafka.controller.RequestSendThread.brokerReady(ControllerChannelManager.scala:232)
    at kafka.controller.RequestSendThread.liftedTree1$1(ControllerChannelManager.scala:185)
    at kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:184)
    at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:63)

I initially thought this connection error was to blame, but after the leader election crashes the Streams app, if I restart the Streams app, it works normally until the next election, without me touching the brokers at all.

All servers (3 Kafka brokers and the Streams app) are running on EC2 instances.

1

1 Answers

10
votes

This is now fixed in 0.10.2.1. If you can't pick that up, make sure you have these two parameters set as follows in your streams config:

final Properties props = new Properties();
...
props.put(ProducerConfig.RETRIES_CONFIG, 10);  
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, Integer.toString(Integer.MAX_VALUE));