1
votes

Consider the following code-

@Bean
public ConsumerFactory<String, String> consumerFactory() {
    Map<String, Object> props = new HashMap<>();
    props.put(
      ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
      bootstrapAddress);
    props.put(
      ConsumerConfig.GROUP_ID_CONFIG, 
      groupId);
    props.put(
      ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
      StringDeserializer.class);
    props.put(
      ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
      StringDeserializer.class);
    return new DefaultKafkaConsumerFactory<>(props);
}

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> 
  kafkaListenerContainerFactory() {

    ConcurrentKafkaListenerContainerFactory<String, String> factory
      = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    return factory;
}

I have created a consumer factory and a concurrentKafkaListenercontainer Factory. I have not set the concurrency for the listener Factory. I have a method annotated with @KafkaListener

@KafkaListener(topics = "topicName")
public void listen(String message) {
    System.out.println("Received Message: " + message);

When I dont set the concurrency property, will Spring create 1 consumer instance, 1 kafka listener container belonging to a group specified in the consumer factory?

If i change the concurrency to 3, will spring create 3 consumer instances, so 3 consumers in the same consumer group specified while configuring consumer factory and 3 listener containers ?

Also, depending upon the concurrency and let's assume we are listening to only one topic now, we will have 3 methods annotated with @kafkalistener, all 3 if partition not specified listening to different partitions(provided by kafka in round-robin manner). ?

I am new to Kafka and wanted to clarify my understanding.

1

1 Answers

5
votes

When I dont set the concurrency property, will Spring create 1 consumer instance, 1 kafka listener container belonging to a group specified in the consumer factory?

You will have one consumer fetching events from all the partitions from that topic.

If i change the concurrency to 3, will spring create 3 consumer instances, so 3 consumers in the same consumer group specified while configuring consumer factory and 3 listener containers ?

You will have 3 consumer instances, if you have at least 3 partitions in that topic, every one of them will be fetching events from one of these partitions. The consumers deliver the events to that KafkaListener instance.

You can be more specific.

@KafkaListener(
  topicPartitions = @TopicPartition(topic = "topicName",
  partitionOffsets = {
    @PartitionOffset(partition = "0", initialOffset = "0"), 
    @PartitionOffset(partition = "3", initialOffset = "0")
}))
public void listenToParition(
  @Payload String message, 
  @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
      System.out.println(
        "Received Messasge: " + message"
        + "from partition: " + partition);
}

Also, depending upon the concurrency and let's assume we are listening to only one topic now, we will have 3 methods annotated with @kafkalistener, all 3 if partition not specified listening to different partitions(provided by kafka in round-robin manner). ?

This has no sense. First of all, the KafkaListeners are a high level abstraction from Spring Kafka, Kafka doesn't round robin nothing at all (from the consumer perspective, it's different with the producers), if you have 3 consumers (same consumer-group + listening on the same topic), and 3 partitions in the topic, Kafka will rebalance and assign one partition to one consumer, each consumer will fetch the events from only its partitions assigned by Kafka. Spring Kafka, after receive the events in every consumer, will deliver the events in the KafkaListener instances.