I setup Kafka ConsumerFactory according to Spring Kafka documentation. However the groupId doesn't seem to be used. Maybe I also am just getting the whole thing wrong so I wanted to let you know what I experienced.
This is my configuration that doesn't seem to work:
@Bean
ConsumerFactory<String, KafkaEvent> kafkaEventConsumerFactory() {
return new DefaultKafkaConsumerFactory<>(
getConsumerProperties(),
new StringDeserializer(),
new JsonDeserializer<>(KafkaEvent.class));
}
Map<String, Object> getConsumerProperties() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); // TODO
props.put(ConsumerConfig.GROUP_ID_CONFIG, "myGroupId");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 3);
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 120000);
props.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 45000);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 70000);
return props;
}
And I have a @KafkaEventListener
configured like this, without specifying the groupId explicitly again:
@KafkaListener(topics = KafkaEventPublisher.ORDER_TOPIC)
public class KafkaEventListener {
@Autowired
private ConsumerFactory<String, KafkaEvent> consumerFactory;
@KafkaHandler
public void listenTo(@Payload KafkaEvent event) {
LOGGER.error(LogMarker.KAFKA, consumerFactory.getConfigurationProperties().toString());
}
}
I can also see that my groupId "myGroupId" is contained in the error Log as logged above. However what makes me suspicious is the DEBUG logging of some ConsumerCoordinator which always states to join a different groupId and I am a bit concerned that this looks correct.
2017-09-04 15:28:13.904 ( ) INFO consumer.internals.AbstractCoordinator - Successfully joined group org.springframework.kafka.KafkaListenerEndpointContainer#0 with generation 40
2017-09-04 15:28:13.904 ( ) INFO consumer.internals.AbstractCoordinator - Successfully joined group org.springframework.kafka.KafkaListenerEndpointContainer#0 with generation 40
2017-09-04 15:28:13.906 ( ) INFO consumer.internals.ConsumerCoordinator - Setting newly assigned partitions [] for group org.springframework.kafka.KafkaListenerEndpointContainer#0
2017-09-04 15:28:13.907 ( ) INFO consumer.internals.ConsumerCoordinator - Setting newly assigned partitions [my-topic-0] for group org.springframework.kafka.KafkaListenerEndpointContainer#0
Also on Spring Startup the ConsumerConfig is outputted. I can see that the groupId is wrong, however other attributes are taken over correctly.
As far as I understood I can set the groupId globally by setting it on the ConsumerFactory or by setting it in application.properties using spring.kafka.consumer.group-id
. Both variants don't work though.
Only when I configure the groupId using @KafkaListener
annotation the LOG states that the consumer joined the correct group:
2017-09-04 15:38:30.787 ( ) DEBUG consumer.internals.AbstractCoordinator - Received successful JoinGroup response for group myGroupId: org.apache.kafka.common.requests.JoinGroupResponse@4c51c449
With this config:
@KafkaListener(topics = KafkaEventPublisher.ORDER_TOPIC, groupId = "myGroupId")
We are using Spring Boot 2.0.0.M3 (thus, Spring Kafka 2.0.0.M3)