4
votes

I have a Kafka-Streams application running with one thread, processing a topic with one partition just fine.

I need to run multiple instances of this application processing different topics at the same time. In my current scenario, all topics have one partition only.

When I run a new instance of the same application (with the same APPLICATION_ID), processing a different topic, the Streams client don't create a new task in this new application. The first instance keeps processing the first topic in the task 0_0, and the second instance waits without and assigned partition doing noghting.

I know I'm using topics with one partition only, but in this case if I have two instances and two topics with one partition to process that makes two partitions, ¿why can't both topics with their single partition be processed at the same time, on in each instance?

I suspect it has something to do with the StreamsPartitionAssignor, but that assignment strategy can't be changed in a Kafka Streams application:

Kafka Streams does not allow to use a custom partition assignor. If you set one yourself, it will be overwritten with the StreamsPartitionAssignor [1]. This is needed to ensure that -- if possible -- partitions are re-assigned to the same consumers (a.k.a. stickiness) during rebalancing.

Edit:

Topology of the application:

[2019-11-20 09:36:35,406] [INFO] stream-thread [avro-to-json-d07ad9ad-f4b6-4787-96cf-19c48e72ad46-StreamThread-1] Starting (org.apache.kafka.streams.processor.internals.StreamThread)
[2019-11-20 09:36:35,407] [INFO] stream-thread [avro-to-json-d07ad9ad-f4b6-4787-96cf-19c48e72ad46-StreamThread-1] State transition from CREATED to RUNNING (org.apache.kafka.streams.processor.internals.StreamThread)
[2019-11-20 09:36:35,407] [INFO] stream-client [avro-to-json-d07ad9ad-f4b6-4787-96cf-19c48e72ad46] Started Streams client (org.apache.kafka.streams.KafkaStreams)
Topologies:
   Sub-topology: 0
    Source: KSTREAM-SOURCE-0000000000 (topics: [])
      --> KSTREAM-MAP-0000000001
    Processor: KSTREAM-MAP-0000000001 (stores: [])
      --> KSTREAM-MAP-0000000002
      <-- KSTREAM-SOURCE-0000000000
    Processor: KSTREAM-MAP-0000000002 (stores: [])
      --> KSTREAM-TRANSFORM-0000000003
      <-- KSTREAM-MAP-0000000001
    Processor: KSTREAM-TRANSFORM-0000000003 (stores: [])
      --> KSTREAM-SINK-0000000004
      <-- KSTREAM-MAP-0000000002
    Sink: KSTREAM-SINK-0000000004 (extractor class: kafka.AvroToJson$$Lambda$97/741730375@957e06)
      <-- KSTREAM-TRANSFORM-0000000003
1
Could you please add your topology as output by topology.describe().toString() to your question?Bruno Cadonna
@BrunoCadonna Added the Topology of the application.kuro

1 Answers

5
votes

If I understand you correctly you run two Streams clients with the same application.id with the same topology except for the input topics. This would mean you effectively run two different Streams apps with your Streams clients because of the distinct input topics. Running two different Streams apps with the same application.id is undefined behaviour since the application.id needs to be unique within a Kafka cluster (see https://kafka.apache.org/23/documentation/#streamsconfigs).

You can either

  1. increase the partitions of one of the topics and use that topic as your input topic in both Streams apps (making them the same app), or
  2. change the application.id of one of the two apps.

Note that option 1 gives you automatic rebalancing of the workload between Streams clients whereas option 2 does not.