0
votes

I setup Kafka ConsumerFactory according to Spring Kafka documentation. However the groupId doesn't seem to be used. Maybe I also am just getting the whole thing wrong so I wanted to let you know what I experienced.

This is my configuration that doesn't seem to work:

@Bean
ConsumerFactory<String, KafkaEvent> kafkaEventConsumerFactory() {
    return new DefaultKafkaConsumerFactory<>(
            getConsumerProperties(),
            new StringDeserializer(),
            new JsonDeserializer<>(KafkaEvent.class));
}

Map<String, Object> getConsumerProperties() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); // TODO
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "myGroupId");
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);


    props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 3);
    props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 120000);

    props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 45000);
    props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 70000);

    return props;
}

And I have a @KafkaEventListener configured like this, without specifying the groupId explicitly again:

@KafkaListener(topics = KafkaEventPublisher.ORDER_TOPIC)
public class KafkaEventListener {

   @Autowired
   private ConsumerFactory<String, KafkaEvent> consumerFactory;

   @KafkaHandler
   public void listenTo(@Payload KafkaEvent event) {
       LOGGER.error(LogMarker.KAFKA, consumerFactory.getConfigurationProperties().toString());
   }

}

I can also see that my groupId "myGroupId" is contained in the error Log as logged above. However what makes me suspicious is the DEBUG logging of some ConsumerCoordinator which always states to join a different groupId and I am a bit concerned that this looks correct.

2017-09-04 15:28:13.904 (    ) INFO consumer.internals.AbstractCoordinator             - Successfully joined group org.springframework.kafka.KafkaListenerEndpointContainer#0 with generation 40
2017-09-04 15:28:13.904 (    ) INFO consumer.internals.AbstractCoordinator             - Successfully joined group org.springframework.kafka.KafkaListenerEndpointContainer#0 with generation 40
2017-09-04 15:28:13.906 (    ) INFO consumer.internals.ConsumerCoordinator             - Setting newly assigned partitions [] for group org.springframework.kafka.KafkaListenerEndpointContainer#0
2017-09-04 15:28:13.907 (    ) INFO consumer.internals.ConsumerCoordinator             - Setting newly assigned partitions [my-topic-0] for group org.springframework.kafka.KafkaListenerEndpointContainer#0

Also on Spring Startup the ConsumerConfig is outputted. I can see that the groupId is wrong, however other attributes are taken over correctly.

As far as I understood I can set the groupId globally by setting it on the ConsumerFactory or by setting it in application.properties using spring.kafka.consumer.group-id. Both variants don't work though.

Only when I configure the groupId using @KafkaListener annotation the LOG states that the consumer joined the correct group:

2017-09-04 15:38:30.787 (    ) DEBUG consumer.internals.AbstractCoordinator             - Received successful JoinGroup response for group myGroupId: org.apache.kafka.common.requests.JoinGroupResponse@4c51c449

With this config:

@KafkaListener(topics = KafkaEventPublisher.ORDER_TOPIC, groupId = "myGroupId")

We are using Spring Boot 2.0.0.M3 (thus, Spring Kafka 2.0.0.M3)

1

1 Answers

1
votes

It's a bug in M3; fixed on master (2.0.3.BUILD-SNAPSHOT) (and in 1.3.0.M2). We are expecting to release the 2.0.0.RC1 release candidate later this week (waiting for the Spring Framework RC4).