1
votes

I have a Kafka Streams application that launches and runs successfully. We have 4 instances of the application running. Occasionally one of our instance of the application is legitimately killed which causes several rounds of rebalancing until the old node is replaced.

Sometimes during the rebalance, one ore more previously healthy nodes fail. The logs are indicating that the Streams application transitions into a PENDING_SHUTDOWN state directly after receiving the following exception:

java.lang.IllegalStateException: No current assignment for partition public.chat.message-28
    at org.apache.kafka.clients.consumer.internals.SubscriptionState.assignedState(SubscriptionState.java:256)
    at org.apache.kafka.clients.consumer.internals.SubscriptionState.resetFailed(SubscriptionState.java:418)
    at org.apache.kafka.clients.consumer.internals.Fetcher$2.onFailure(Fetcher.java:621)
    at org.apache.kafka.clients.consumer.internals.RequestFuture.fireFailure(RequestFuture.java:177)
    at org.apache.kafka.clients.consumer.internals.RequestFuture.raise(RequestFuture.java:147)
    at org.apache.kafka.clients.consumer.internals.RequestFutureAdapter.onFailure(RequestFutureAdapter.java:30)
    at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onFailure(RequestFuture.java:209)
    at org.apache.kafka.clients.consumer.internals.RequestFuture.fireFailure(RequestFuture.java:177)
    at org.apache.kafka.clients.consumer.internals.RequestFuture.raise(RequestFuture.java:147)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:571)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:389)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:297)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:215)
    at org.apache.kafka.clients.consumer.internals.Fetcher.getTopicMetadata(Fetcher.java:292)
    at org.apache.kafka.clients.consumer.internals.Fetcher.getAllTopicMetadata(Fetcher.java:275)
    at org.apache.kafka.clients.consumer.KafkaConsumer.listTopics(KafkaConsumer.java:1849)
    at org.apache.kafka.clients.consumer.KafkaConsumer.listTopics(KafkaConsumer.java:1827)
    at org.apache.kafka.streams.processor.internals.StoreChangelogReader.refreshChangelogInfo(StoreChangelogReader.java:259)
    at org.apache.kafka.streams.processor.internals.StoreChangelogReader.initialize(StoreChangelogReader.java:133)
    at org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:79)
    at org.apache.kafka.streams.processor.internals.TaskManager.updateNewAndRestoringTasks(TaskManager.java:328)
    at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:866)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:804)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:773)

Prior to this error we often seem to also recieve some informational logs reporting a disconnect exception:

 Error sending fetch request (sessionId=568252460, epoch=7) to node 4: org.apache.kafka.common.errors.DisconnectException

I have a feeling the two are related but I'm unable to reason why at present.

Is anyone able to give me some hints as to what may be causing this issue and any possible solutions?

Additional Info:

  • Kafka 2.2.1
  • 32 partitions spread evenly across the 4 worker nodes
  • StreamsConfig settings:
kafkaStreamProps.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 2);
kafkaStreamProps.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 1);
kafkaStreamProps.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 4);
kafkaStreamProps.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 120000);
kafkaStreamProps.put(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE);
1

1 Answers

2
votes

This looks like it could be related to https://issues.apache.org/jira/browse/KAFKA-9073, which has been fixed in Kafka Streams 2.3.2.

If you can't wait for that release, you could try creating a private build using the changeset from this pull request: https://github.com/apache/kafka/pull/7630/files