0
votes

I'm using Kafka with Spring Boot. I use Rest Controllers to call Producer/Consumer API's. Producer Class is able to add messages to the topic. I verified using command line utility (Console-consumer.sh). However my Consumer class is not able to receive them in Java for further processing. @KafkaListener used in Consumer class listener method should be able to receive messages when my Producer class posts messages to the topic which is not happening. Any help appreciated.

Is it still necessary for consumer to subscribe and poll for records when I have already created KafkaListenerContainerFactory that is responsible for invoking Consumer Listener method when a message is posted to the topic?

Consumer Class

@Component
public class KafkaListenersExample {

    private final List<KafkaPayload> messages = new ArrayList<>();

    @KafkaListener(topics = "test_topic", containerFactory = "kafkaListenerContainerFactory")
    public void listener(KafkaPayload data) {
        synchronized (messages){
            messages.add(data);
        }
        //System.out.println("message from kafka :"+data);
    }

    public List<KafkaPayload> getMessages(){
        return messages;
    }
}

Consumer Config

@Configuration
class KafkaConsumerConfig {

    private String bootstrapServers = "localhost:9092";
 

    @Bean
    public ConsumerFactory<String, KafkaPayload> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
                bootstrapServers);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
        return new DefaultKafkaConsumerFactory<>(props) ;
    }

    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, KafkaPayload> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, KafkaPayload> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerConfigs());
        return factory;
    }
}
1

1 Answers

0
votes

The listener container creates the consumer, subscribes, and takes care of the polling.

Turning on DEBUG logging should help determine what's wrong.

If the records are already in the topic, you need to set ConsumerConfig.AUTO_OFFSET_RESET_CONFIG to earliest. Otherwise, the consumer starts consuming from the end of the topic (latest).