1
votes

I am new to Apache Kafka and I am trying to understand the differences between:

  1. Creating two consumers that belong to the same group id that consume from two partitions of the same topic.
  2. Creating one consumer with two threads that consume from two partitions of the same topic.

enter image description here

In the first approach, what I actually understand is that each consumer will only consume the messages of the partition that is "related" to as the two consumers belong to the same group.

As I can see in the example of the image below, consumer1 will consume AAAA and BBBB and consumer2 will consume CCCC and DDDD.

enter image description here

In the second approach, as I can see in the documentation:

Consumer with many threads: If processing a record takes a while, a single Consumer can run multiple threads to process records, but it is harder to manage offset for each Thread/Task. If one consumer runs multiple threads, then two messages on the same partitions could be processed by two different threads which make it hard to guarantee record delivery order without complex thread coordination. This setup might be appropriate if processing a single task takes a long time, but try to avoid it.

So in the example below, some different situations could happen:

  • Thread1 consumes AAAA and CCCC / Thread2 consumes BBBB and DDDD
  • Thread1 consumes AAAA and BBBB / Thread2 consumes CCCC and DDDD
  • Thread1 consumes AAAA and DDDD / Thread2 consumes CCCC and BBBB
  • Thread1 consumes CCCC and DDDD / Thread2 consumes AAAA and BBBB
  • Thread1 consumes BBBB and DDDD / Thread2 consumes AAAA and CCCC
  • Thread1 consumes BBBB and CCCC / Thread2 consumes AAAA and DDDD

enter image description here

The difference between the two approaches looks trivial. Whereas in the first approach each consumer only consumes the partition assigned to it sequentially, in the second approach, each thread can consume from both partitions, but it could happen that Thread1 consumes AAAA and Thread2 BBBB and the process of BBBB finishes before the process AAAA is finished.

I have developed a Kafka consumer with springboot. My kafka topic has 5 partitions. Each partition has information that needs to be consumed sequentially. I have decided to create only one consumer with 5 threads instead of creating 5 consumers. Theorically, that doesn´t really make sense, because I need to consume the information of each partition sequentially, and creating threads would break that logic.

In the Kafka listener I have defined RECEIVED_PARTITION_ID to print from which partition is consuming the data. I have configured log4j to print which is the thread that is running the KafkaListener like: [org.springframework.kafka.KafkaListenerEndpointContainer#0-thread-C-1]

The code of KafkaConsumer.java:

@KafkaListener(topics = "${kafka.topic}", groupId = "${kafka.groupid}", containerFactory = "kafkaListenerContainerFactory")    
public void consumeJson(String mensaje,
            @Header(KafkaHeaders.RECEIVED_PARTITION_ID) String partitionID) throws InterruptedException {
        logger.info("-> Consumed JSon message");
        logger.info("-> Read from partition:" + partitionID);
        logger.info("-> Consumed JSON Message: [" + mensaje.toString()+"]");
}

The code of KafkaConfiguration.java:

@Bean
public ConsumerFactory<String, String> consumerFactory() {
    Map<String, Object> config = new HashMap<>();

    config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
    config.put(ConsumerConfig.GROUP_ID_CONFIG, groupid);
    config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    config.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPoll);
    // Commit manual (para poner AckMode.BATCH), por configuración
    config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit);

    return new DefaultKafkaConsumerFactory<>(config);
}

@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String,String>> kafkaListenerContainerFactory(){
    ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<String, String>();
    factory.setConsumerFactory(consumerFactory());
    factory.setConcurrency(5);
    // Commit the offset when the listener returns after processing the record. 
    factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.BATCH);
    return factory;
}

When I run my springboot application and I see the log, it looks like one thread/partition relationship is done like if one consumer/partition was made. Each thread only consumes data from one partition! Thread0 -> Partition0, Thread1 -> Partition1, Thread2 -> Partition2, Thread3 -> Partition3 , Thread4 -> Partition4. I have checked thwosands of Log rows and it always match.

Little part of the log:

[org.springframework.kafka.KafkaListenerEndpointContainer#0-3-C-1][9980][e.e.s.l.KafkaConsumer:][][] Consumed JSon message Read from partition3

[org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1][9980][e.e.s.l.KafkaConsumer:][][] Consumed JSon message Read from partition1

[org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1][9980][e.e.s.l.KafkaConsumer:][][] Consumed JSon message Read from partition0

[org.springframework.kafka.KafkaListenerEndpointContainer#0-4-C-1][9980][e.e.s.l.KafkaConsumer:][][] Consumed JSon message Read from partition4

[org.springframework.kafka.KafkaListenerEndpointContainer#0-2-C-1][9980][e.e.s.l.KafkaConsumer:][][] Consumed JSon message Read from partition2

[org.springframework.kafka.KafkaListenerEndpointContainer#0-1-C-1][9980][e.e.s.l.KafkaConsumer:][][] Consumed JSon message Read from partition1

This doesn´t make sense to me. As the documentation says:

Consumer with many threads: If processing a record takes a while, a single Consumer can run multiple threads to process records, but it is harder to manage offset for each Thread/Task. If one consumer runs multiple threads, then two messages on the same partitions could be processed by two different threads which make it hard to guarantee record delivery order without complex thread coordination. This setup might be appropriate if processing a single task takes a long time, but try to avoid it.

Can anyone explain me what is happening? Which is the difference between same consumer group 5 consumer/ 5 partition relationship and 1 consumer/ 5 thread relationship?

For now on it looks to be the same.

Thank you!

1

1 Answers

0
votes

You should NEVER perform async processing; using 5 consumers, each getting one partition, is the right approach and the only way of guaranteeing in-order processing.

Using multiple threads from a single consumer causes problems with offset management.

Setting the concurrency to 5 creates 5 consumers.

This doesn´t make sense to me. As the documentation says:

Which "documentation"?

The final sentence is correct:

This setup might be appropriate if processing a single task takes a long time, but try to avoid it.

With the important word being might; but I would say NEVER.